以前从来没去研究过安卓apk脱壳之类的,这次正好有机会和时间,所以也就上手研究了下,从入门到猝死,每天睡四五个小时,其他时间全部在研究,脑壳都给干冒烟了,主要是入手后脑子就停不下来,导致无法安心睡眠,不过期间学习到了挺多的知识的,只能说痛并快乐着。。。
目的:
泡泡聊天APP(MosTalk)最近突然火起来了,同时也引来了很多炸群广告,用晚上开始全体群禁言的方式显得很呆,考虑写个脚本全自动检测所有群,发现炸群者直接踢出并撤回消息的辅助脚本,所以也就有了这次的研究。
查壳后发现用了最新的360加固,于是在脱壳上面废了几天时间,主要是不会IDA分析内存,所以只有各种查资料去找别人的工具了,最后通过 frida-unpack 成功脱壳,具体可以看上篇文章:初入APK逆向破解小记 这里就不过多阐述了。
脱壳后的代码是 Java 代码,拿着自己半吊子的功底在加上 open AI ,倒是也能做到审计了。
思路1:通过python+frida hook 最新消息接受方法
泡泡聊天APP其实是存在WEB聊天页面的,只是它聊天是走的websocket,对websocket并不了解,所以不知道怎么监听到最新的群消息,只能想到这么个思路了。
尝试半天,总算写好了代码,测试了下发现存在个问题,会遗漏消息,并不能把所有的消息全部撤回;
在这里卡了一段时间,最开始以为是消息接受处理存在问题,所以又改成了接受消息存本地数据库,然后从数据库取出消息在处理,最后无奈发现因为python调用frida hook的原因,是必然会遗漏些频率高的消息的,不管用不用数据库。。。
虽然遗漏一些消息,但是只要能实现检测到频繁者,能撤回绝大部分消息并自动踢人,其实也算完工了,但是我这该死的完美主义,消息遗漏+配合模拟器ADB这种繁琐的方式,让我这个完美主义者不能忍,所以最终 PASS 掉了这个方法。
这里贴下当时的代码把,可能存在一些bug,毕竟是个半成品:
1
import
frida,json,re,time,datetime,requests,hashlib,threading,os,queue
2
import
jaydebeapi
3
from
collections
import
defaultdict
4
5
def
md5_hash
(text)
:
6
md5_hasher = hashlib.md5()
7
md5_hasher.update(text.encode(
'utf-8'
))
8
hashed_text = md5_hasher.hexdigest()
9
10
return
hashed_text
11
12
def
md5Psw
(account, pwd)
:
13
hashed_pwd = md5_hash( md5_hash(pwd) + account +
'MOSGRAM'
)
14
return
hashed_pwd
15
16
def
remove_special_characters
(input_string)
:
17
try
:
18
# 定义特殊字符的正则表达式模式
19
special_char_pattern = re.compile(
'[^x00-x7F]+'
)
20
21
# 使用正则表达式替换特殊字符为空字符串
22
cleaned_string = special_char_pattern.sub(
''
, input_string)
23
24
return
cleaned_string
25
except
Exception
as
e:
26
print(
"[!] 异常发生:"
, e)
27
return
input_string
28
29
def
data_to_json
(data)
:
30
try
:
31
pattern =
r'(w+)s*=s*([^,]+)'
32
33
result = []
34
records = data.split(
'], ['
)
35
36
for
record
in
records:
37
match = re.findall(pattern, record)
38
record_dict = {}
39
for
key, value
in
match:
40
if
value.startswith(
'{'
)
and
value.endswith(
'}'
):
41
record_dict[key] = json.loads(value)
42
else
:
43
record_dict[key] = value
44
result.append(record_dict)
45
46
parsed_data = json.dumps(result, indent=
4
)
47
return
parsed_data
48
except
json.JSONDecodeError
as
json_error:
49
print(
"[!] 数据转换失败: JSON 解析错误:"
, json_error)
50
except
Exception
as
e:
51
print(
"[!] 数据转换失败:"
, e)
52
return
None
53
54
def
custom_request
(method, url, json_data=None, headers=None)
:
55
try
:
56
if
method ==
'POST'
:
57
response = requests.get(url, headers=headers)
58
elif
method ==
'POST'
:
59
response = requests.post(url, json=json_data, headers=headers)
60
elif
method ==
'DELETE'
:
61
response = requests.delete(url, json=json_data, headers=headers)
62
else
:
63
raise
ValueError(
"[!] 不支持的请求方法: "
+ method)
64
65
return
response.json()
66
except
requests.exceptions.RequestException
as
e:
67
print(
"[!] 网络请求异常:"
, e)
68
return
None
69
70
71
def
usb_device
()
:
72
try
:
73
# 连接安卓机上的frida-server
74
device = frida.get_usb_device(
1000
)
75
# 启动app
76
pid = device.spawn([
"com.paopaotalk.im"
])
77
device.resume(pid)
78
session = device.attach(pid)
79
return
session
80
except
:
81
exit(
"[!] 启动应用APP出错,重新执行脚本!"
)
82
83
84
# 消息回调函数,处理从脚本发送的消息
85
def
on_message
(message, payload)
:
86
try
:
87
global
call_back_message
88
if
message[
'type'
] ==
'send'
:
89
call_back_message = message[
'payload'
]
90
return
call_back_message
91
except
:
92
exit(
"[!] 消息回调失败,重新执行脚本!"
)
93
94
def
hook_function
(session)
:
95
try
:
96
global
script
97
98
if
script:
99
script.unload()
# 释放之前的脚本
100
101
script_source =
"""
102
Java.perform(function()
103
{
104
var getVal = Java.use('v5.g');
105
var cons = getVal.l();
106
var send_it = cons.toString();
107
send(send_it);
108
});
109
"""
110
script = session.create_script(script_source)
111
script.on(
'message'
, on_message)
112
script.load()
113
return
script
114
except
:
115
exit(
"[!] 注入脚本失败,重新执行脚本!"
)
116
117
#登录
118
def
login
(username,pwd)
:
119
try
:
120
print(
"[-] 正在登录账号..."
)
121
login_data_json = {
"username"
:
"aaa"
,
"password"
:
"bbb"
,
"device_type"
:
"WEB"
}
122
login_data_json[
"username"
] = username
123
login_data_json[
"password"
] = md5Psw(username, pwd)
124
login_header_json = {
"Content-Type"
:
"application/json; charset=UTF-8"
,
"X-Requested-With"
:
"XMLHttpRequest"
,
"App_id"
:
"qiyunxin"
,
"timestamp"
: str(int(time.time() *
1000
)),
"sign"
: md5_hash(str(int(time.time() *
1000
)) +
"WEB"
),}
125
logins = custom_request(
'POST'
, url=
'http://api.vvchat.im/userservices/v2/user/login'
, json_data=login_data_json, headers=login_header_json)
126
127
if
logins[
'token'
]
and
logins[
'open_id'
]:
128
print(
"[!] 登录账号成功!"
)
129
return
logins
130
except
:
131
exit(
"[!] 登录失败,重新执行脚本!"
)
132
133
#数据转换集合
134
def
zhuanhuan
(data)
:
135
data_json = json.loads(data_to_json(call_back_message))
136
return
data_json
137
138
def
tiren
(chat_name, from_cust_id)
:
139
if
chat_name
in
groups_list:
140
open_id = custom_request(
'GET'
, url=
f"http://api.vvchat.im/groupapi/v1/users/
{logins[
'open_id'
]}
/groups/
{groups_list[chat_name[
1
]]}
/members?page_size=100000"
, headers={
"Content-Type"
:
"application/json; charset=UTF-8"
,
"Authorization"
: logins[
'token'
]})
141
142
if
open_id:
143
if
chs[
'err_msg'
] ==
'OK'
:
144
print(
f"[-] 群消息
{row[
0
]}
撤回成功!"
)
145
delete_data(
f"DELETE FROM data WHERE msg_no = '
{row[
0
]}
' AND chat_name = '
{row[
1
]}
';"
)
146
147
else
:
148
print(
f"[!] 群消息
{row[
0
]}
撤回失败,原因:
{chs[
'err_msg'
]}
请检查..."
)
149
delete_data(
f"DELETE FROM data WHERE msg_no = '
{row[
0
]}
' AND chat_name = '
{row[
1
]}
';"
)
150
151
else
:
152
msg_no_queue.put(row)
153
print(
f"[!] 群消息
{row[
0
]}
撤回失败,已重新添加到列队!"
)
154
155
156
#持续不间断接受新数据并转存到列队
157
def
run1
()
:
158
global
unique_data_set
159
160
while
True
:
161
hook_function(session)
162
if
call_back_message:
163
call_back_message_json = zhuanhuan(remove_special_characters(call_back_message))
164
165
for
item
in
call_back_message_json:
166
if
groups
in
item.get(
"chat_name"
,
""
)
and
item.get(
"chat_type"
,
""
) ==
"2"
:
167
168
data = (item.get(
"chat_name"
,
""
),item.get(
"msg_no"
,
""
),item.get(
"from_cust_id"
,
""
),item.get(
"content"
,
""
),item.get(
"msg_time"
,
""
))
169
170
if
item.get(
"content"
,
""
) !=
'{0}开启群禁言'
and
item.get(
"content"
,
""
) !=
'{0}撤回了一条消息'
and
item.get(
"content"
,
""
) !=
''
and
data
not
in
unique_data_set:
171
ls_queue.put(data)
172
unique_data_set.add(data)
173
174
time.sleep(
0.0001
)
175
176
#从列表中提取数据判断是否存在
177
def
run2
()
:
178
while
True
:
179
with
search_queue_lock:
180
while
not
ls_queue.empty():
181
data_to_insert = ls_queue.get()
# 从队列取出数据
182
183
search_msg_no = query_data(
f"select id from data where msg_no = '
{data_to_insert[
2
]}
'"
)
184
185
if
not
search_msg_no:
186
insert_queue.put(data_to_insert)
187
188
time.sleep(
0.001
)
189
190
#将insert_queue存储的最新发信信息插入到数据库中
191
def
run3
()
:
192
while
True
:
193
with
search_queue_lock:
194
while
not
insert_queue.empty():
195
data_to_insert = insert_queue.get()
# 从队列取出数据
196
print(
"[-] 新数据:"
,data_to_insert)
197
insert_data(data_to_insert)
198
199
time.sleep(
0.001
)
200
201
#搜索数据库,找出违规者所有发信MSG_NO编号,加入到列队中
202
def
run4
()
:
203
while
True
:
204
with
search_queue_lock:
205
if
msg_no_queue.empty()
and
insert_queue.empty()
and
ls_queue.empty():
206
#数据赛选,提取违规者发的全部信息 MSG_NO编号
207
select_query =
"""
208
WITH UniqueChatNames AS (
209
SELECT chat_name, from_cust_id FROM DATA
210
WHERE
211
id IN ( SELECT MIN(id) FROM
212
(SELECT id,chat_name,from_cust_id,content,SUBSTRING(msg_time, 1, 9) FROM DATA
213
WHERE
214
(chat_name,from_cust_id,content,SUBSTRING(msg_time, 1, 9)) IN (
215
SELECT chat_name,from_cust_id,content,SUBSTRING(msg_time, 1, 9) FROM DATA
216
GROUP BY chat_name,from_cust_id,content,SUBSTRING(msg_time, 1, 9)
217
HAVING COUNT(*) > 1
218
)
219
) grouped_data
220
GROUP BY chat_name, from_cust_id
221
)
222
) SELECT STRINGDECODE (d.msg_no), STRINGDECODE (d.chat_name)
223
FROM
224
(
225
SELECT *, ROW_NUMBER () OVER ( PARTITION BY chat_name, from_cust_id, msg_no ORDER BY id ) AS rn FROM DATA
226
) d JOIN UniqueChatNames subquery ON d.chat_name = subquery.chat_name AND d.from_cust_id = subquery.from_cust_id
227
WHERE
228
d.rn = 1;
229
"""
230
231
results = query_data(select_query)
232
233
if
results:
#列表类型
234
msg_no_queue.put(results)
235
print(
f"[!] 已检测到
{len(results)}
条违规信息."
)
236
237
time.sleep(
0.001
)
238
239
#提取msg_no编号,并撤回这些编号消息
240
def
run5
()
:
241
#登录
242
logins = login(username,pwd)
243
while
True
:
244
with
search_queue_lock:
245
while
not
msg_no_queue.empty():
246
msg_no_list = msg_no_queue.get()
247
248
for
row
in
msg_no_list:
249
if
row[
1
]
in
groups_list:
250
251
data = {
"msg_no"
: row[
0
],
"msg_time"
: int(time.time() *
1000
)}
252
253
chs = custom_request(
'POST'
, url=
f"http://api.vvchat.im/groupapi/v1/users/
{logins[
'open_id'
]}
/groups/
{groups_list[row[
1
]]}
/revoke_msg"
, json_data=data, headers={
"Content-Type"
:
"application/json; charset=UTF-8"
,
"Authorization"
: logins[
'token'
]})
254
255
if
chs:
256
if
chs[
'err_msg'
] ==
'OK'
:
257
print(
f"[-] 群消息
{row[
0
]}
撤回成功!"
)
258
delete_data(
f"DELETE FROM data WHERE msg_no = '
{row[
0
]}
' AND chat_name = '
{row[
1
]}
';"
)
259
260
else
:
261
print(
f"[!] 群消息
{row[
0
]}
撤回失败,原因:
{chs[
'err_msg'
]}
请检查..."
)
262
delete_data(
f"DELETE FROM data WHERE msg_no = '
{row[
0
]}
' AND chat_name = '
{row[
1
]}
';"
)
263
264
else
:
265
msg_no_queue.put(row)
266
print(
f"[!] 群消息
{row[
0
]}
撤回失败,已重新添加到列队!"
)
267
else
:
268
exit(
"[!] 违规所在群 ID 获取失败, 代码中添加该群ID!"
)
269
270
time.sleep(
0.001
)
271
272
#5分钟清理一次数据库
273
def
run6
()
:
274
global
unique_data_set
275
while
True
:
276
time.sleep(
300
)
# 五分钟
277
unique_data_set.clear()
278
delete_data(
"TRUNCATE TABLE data;"
)
#清空数据库
279
280
# 创建数据库连接
281
def
create_connection
()
:
282
conn = jaydebeapi.connect(
"org.h2.Driver"
, h2_jdbc_url, [h2_user, h2_password], h2_driver_path)
283
return
conn
284
285
# 创建表
286
def
create_table
()
:
287
conn = create_connection()
288
create_table_sql =
"""
289
CREATE TABLE IF NOT EXISTS data (
290
id BIGINT AUTO_INCREMENT PRIMARY KEY,
291
chat_name TEXT NOT NULL,
292
msg_no TEXT NOT NULL,
293
from_cust_id TEXT NOT NULL,
294
content TEXT NOT NULL,
295
msg_time BIGINT NOT NULL
296
)
297
"""
298
299
cursor = conn.cursor()
300
try
:
301
cursor.execute(create_table_sql)
302
conn.commit()
303
except
jaydebeapi.Error
as
e:
304
print(
"创建表异常:"
, e)
305
cursor.close()
306
return
False
307
finally
:
308
cursor.close()
309
310
return
True
311
312
# 插入数据
313
def
insert_data
(data)
:
314
insert_query =
'''
315
INSERT INTO data (chat_name, msg_no, from_cust_id, content, msg_time) VALUES (?, ?, ?, ?, ?);
316
'''
317
318
try
:
319
conn = create_connection()
320
cursor = conn.cursor()
321
cursor.execute(insert_query, data)
322
conn.commit()
323
except
jaydebeapi.Error
as
e:
324
print(
"[!] 插入失败:"
, e)
325
finally
:
326
cursor.close()
327
328
# 查询数据
329
def
query_data
(sql)
:
330
try
:
331
conn = create_connection()
332
cursor = conn.cursor()
333
cursor.execute(sql)
334
results = cursor.fetchall()
335
except
jaydebeapi.Error
as
e:
336
print(
"[!] 查询失败:"
, e)
337
finally
:
338
cursor.close()
339
340
return
results
341
342
def
delete_data
(sql)
:
343
try
:
344
conn = create_connection()
345
cursor = conn.cursor()
346
cursor.execute(sql)
347
conn.commit()
348
except
jaydebeapi.Error
as
e:
349
print(
"[!] 查询失败:"
, e)
350
finally
:
351
cursor.close()
352
353
if
__name__ ==
"__main__"
:
354
################################################
355
username =
'0086'
356
pwd =
''
357
groups =
'关键词'
#需要踢人的群名字统一关键词,也可以是一个字母符号,如果群名字不带这个则不会踢人
358
groups_list = {
359
"aaa测试3"
:
"pUalcqhBTcdg44G_j9s5K-XCUF83FRHB"
,
360
"aaa测试1"
:
"928JLh8AAA65RyryJLbTiPKnV0NJWXIi"
,
361
"aaa测试2"
:
"Y26osJNBCaOMENv71XMT7CmOT1hYhee9"
362
}
363
################################################
364
365
# 全局变量
366
call_back_message =
None
367
script =
None
# 用于存储已加载的脚本
368
unique_data_set = set()
369
370
# 创建队列
371
ls_queue = queue.Queue()
372
insert_queue = queue.Queue()
373
msg_no_queue = queue.Queue()
374
375
# 创建锁
376
search_queue_lock = threading.Lock()
377
378
# 连接参数
379
h2_jdbc_url =
"jdbc:h2:./test"
380
h2_user =
"test"
381
h2_password =
"test"
382
h2_driver_path =
"h2-1.4.200.jar"
383
384
385
if
not
os.path.exists(os.path.expanduser(
"./test.mv.db"
)):
386
# 数据库文件不存在,创建数据库和表
387
if
create_table():
388
print(
"数据库表创建成功!"
)
389
else
:
390
os.remove(
"./test.mv.db"
)
391
exit(
"[!] 数据表创建失败,退出代码!"
)
392
393
print(
"[-] 正在启动应用APP..."
)
394
session = usb_device()
395
time.sleep(
8
)
#APP启动有加载时间
396
397
try
:
398
print(
"[-] 检测状态: 正在检测信息..."
)
399
# 启动线程模拟函数的同时运行
400
threading.Thread(target=run1).start()
401
threading.Thread(target=run2).start()
402
threading.Thread(target=run3).start()
403
threading.Thread(target=run4).start()
404
threading.Thread(target=run5).start()
405
threading.Thread(target=run6).start()
406
except
KeyboardInterrupt:
407
script.unload()
408
print(
"[-] 脚本已卸载,程序退出。"
)
思路2:通过调用APP本地数据库
在审计源码过程中发现泡泡APP存在本地数据库,将最新的消息全部存在本地数据库中,当时以为终于找到出路了,通过sql语句调用能非常简单快速的达到我要的效果。
奈何困难总比办法多,在数据库上又卡住了;
数据库使用了sqlcipher 4.5.3 加密,在尝试用pysqlcipher3库打开sqlite数据库始终连不上,查阅半天资料发现pysqlcipher3库不支持sqlcipher 4.5.3 加密的sqlite数据库;
后又尝试本地编译sqlcipher失败,搭建环境各种报错,百度谷歌都快翻烂了都没解决,这里不得不吐槽下,为什么这么不容易入手的加密方式还这么出名和流行?就因为它难入手吗。。。
这里不得不提一下,后来换思路的时候才看到官方提供了windows编译的具体步骤教程,可惜当时已经对它彻底失望了,所以没去尝试,不过我感觉最终还是会编译失败。。。
后面通过 DB Browser 和 SQLiteStudio 2款sqlit数据库管理工具连上了数据库;
然后又查阅资料,python应该如何依赖这些工具来调用数据库呢?
2款工具都不存在API,只能考虑命令行了;
DB Browser 不存在Windows下命令行,不过可以在Linux下安装,所以使用 Termux 命令执行APP在安卓系统安装,结果发现源找不到库DB Browser的库,更换源也没辙,官方也没有提供Linux下编译安装包。
SQLiteStudio 直接不支持命令行,不过发现它可以直接通过adb的方式连接上APP中的数据库,但是因为没法通过python调用,所以最后pass掉了。
思路3:从socket上面入手
最开始的时候还是满脑子盯着安卓代码研究,看它消息接受是怎么接受的,最后定位到了它socket接受消息的方法;
通过hook发现一个蛋疼的事情,每次消息接受scoket的IP端口都在变化,当时不懂是为什么,后听一哥们说是中转服务器,任意连一个就行了;
尝试使用python连接这些IP端口总是不成功,请教那哥们后那哥们告诉我用web端的websocket,他通过websocket网页在线连接测试成功连接上,所以我最后又将目光放在了WEB端的websocket上了。
0
4
研究 websocket
通过各种查阅资料,去了解websocket的原理、流程等方面资料后,开始尝试用python连接;
连上以后并没有收到任何数据,隔一段时间连接就断开了,这也是我最开始不考虑webscoket的原因,因为不懂不了解;
后面尝试网上的发送心跳保持长连接活跃的方法,发现还是没屁用,收不到数据,20秒断开连接。
卡了几天后,开始研究WEB端的JS代码,看他的JS是如何连接处理websocket接受数据的,
JS连接webscoket代码:https://im.mlskm.com/js/aiti-js-sdk-1.0.js
经过分析发现他连上webscoket后先构造登录数据包登录,登录后每20秒发送一次自定义构造的心跳数据包,通过查看它打印的控制台日志也是如此流程,这也是我收不到数据和断开连接的原因;
最后通过一点点根据JS代码来构造数据包,成功使用python连接登录并接受到了数据,本以为终于可以告别这次苦逼的研究了,奈何又遇到了新问题,20秒自动断开连接!!
来回检查代码对比代码,发送心跳后服务端返回心跳正常,并且心跳发送也是10秒发一次,为什么20秒还是自动断开了?
这个问题卡了我两三天,最后成功的时候才发现,python 的 websockets 库就是个垃圾,垃圾中的超级垃圾,坑死我了,我不知道它为什么会20秒断开,看看AI怎么评价 websockets 的:
0
5
使用 websocket 库成功搞定
最后把websockets库换成websocket库成功保持长连接活跃,也能持续接受新消息,只能说非常Nice,剩下的就是处理和优化了,贴上最终代码和截图:
代码:
1
def
calc_checksum
(buffer, buffer_len)
:
2
if
buffer
is
None
:
3
return
[
0
,
0
,
0
,
0
]
4
5
tmp = [
0
,
0
,
0
,
0
]
6
dest = [
0
,
0
,
0
,
0
]
7
len =
0
8
offset =
0
9
10
while
len < buffer_len:
11
if
len +
4
> buffer_len:
12
offset = buffer_len - len
13
else
:
14
offset =
4
15
16
for
i
in
range(offset):
17
tmp[i] ^= buffer[len + i]
18
19
len +=
4
20
21
for
i
in
range(
4
):
22
dest[i] = tmp[i]
23
24
return
dest
25
26
def
split_cust_id
(cust_id)
:
27
result = [
0
] *
8
28
29
i =
0
30
while
cust_id >
0
:
31
temp = cust_id
32
cust_id = cust_id //
256
33
result[i] = temp - cust_id *
256
34
i +=
1
35
36
return
result
37
38
def
split_auth
(auth)
:
39
result = [ord(char)
for
char
in
auth]
40
if
len(result) <
32
:
41
for
i
in
range(len(result),
32
):
42
result.append(
0
)
43
return
result
44
45
def
md5_hash
(text)
:
46
md5_hasher = hashlib.md5()
47
md5_hasher.update(text.encode(
'utf-8'
))
48
hashed_text = md5_hasher.hexdigest()
49
50
return
hashed_text
51
52
def
md5Psw
(account, pwd)
:
53
hashed_pwd = md5_hash( md5_hash(pwd) + account +
'MOSGRAM'
)
54
return
hashed_pwd
55
56
def
login
(username,pwd)
:
57
global
open_id_qj,im_token,token
58
59
data = {
60
"username"
:
f"
{username}
"
,
61
"password"
: md5Psw(username, pwd),
62
"device_type"
:
"WEB"
63
}
64
65
headers = {
66
"Content-Type"
:
"application/json; charset=UTF-8"
,
67
"X-Requested-With"
:
"XMLHttpRequest"
,
68
"App_id"
:
"qiyunxin"
,
69
"timestamp"
: str(int(time.time() *
1000
)),
70
"sign"
: md5_hash(str(int(time.time() *
1000
)) +
"WEB"
),
71
"User-Agent"
:
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
72
}
73
74
try
:
75
print(
'[信息] WEB端登录中...'
)
76
response = requests.post(
'http://api.vvchat.im/userservices/v2/user/login'
, headers=headers,data=json.dumps(data))
77
data = response.json()
78
79
if
'err_msg'
in
data:
80
exit(
'[信息] WEB端登录失败: %s!'
% data[
'err_msg'
])
81
82
elif
'open_id'
in
data:
83
print(
'[信息] WEB端成功!'
)
84
85
open_id_qj = data[
'open_id'
]
86
im_token = data[
'im_token'
]
87
token = data[
'token'
]
88
89
except
requests.exceptions.RequestException
as
e:
90
print(
'[信息] WEB端登录出错:%s'
%(e))
91
except
Exception
as
e:
92
print(
'[信息] WEB端登录异常报错:%s'
%(e))
93
94
def
revoke_msg
(msg_no,groups_id,group_name)
:
95
print(
f"[巡查] 开始撤回群
{group_name}
消息编号
{msg_no}
消息!"
)
96
data = {
"msg_no"
: msg_no,
"msg_time"
: int(time.time()) *
1000
}
97
headers = {
"Content-Type"
:
"application/json; charset=UTF-8"
,
"X-Requested-With"
:
"XMLHttpRequest"
,
"App_id"
:
"qiyunxin"
,
"User-Agent"
:
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
,
"Authorization"
:token}
98
99
try
:
100
response = requests.post(
f'http://api.vvchat.im/groupapi/v1/users/
{open_id_qj}
/groups/
{groups_id}
/revoke_msg'
, headers=headers,data=json.dumps(data))
101
result = response.json()
102
103
if
result[
'err_msg'
] ==
'OK'
:
104
print(
f"[巡查] 群
{group_name}
消息编号
{msg_no}
撤回成功!"
)
105
else
:
106
print(
f"[巡查] 群
{group_name}
消息编号
{msg_no}
撤回失败,原因:
{result[
'err_msg'
]}
请检查..."
)
107
108
except
requests.exceptions.RequestException
as
e:
109
print(
'[巡查] 群 %s 消息编号 %s 撤回出错:%s ,重试一次...'
%(group_name,msg_no,e))
110
revoke_msg(msg_no,groups_id)
111
except
Exception
as
e:
112
print(
'[巡查] 群 %s 消息编号 %s 撤回异常报错:%s ,重试一次...'
%(group_name,msg_no,e))
113
revoke_msg(msg_no,groups_id)
114
115
def
delete_member
(from_cust_id,groups_id)
:
116
group_name = groups_name(groups_id)
117
print(
f"[巡查] 开始剔除群
{group_name}
成员
{from_cust_id}
"
)
118
from_cust_id_encrypt = encryptCustId(from_cust_id)
119
120
data = [{
"name"
:
"default"
,
"open_id"
: from_cust_id_encrypt,
"cust_id"
: from_cust_id}]
121
122
headers = {
"Content-Type"
:
"application/json; charset=UTF-8"
,
"X-Requested-With"
:
"XMLHttpRequest"
,
"App_id"
:
"qiyunxin"
,
"User-Agent"
:
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
,
"Authorization"
:token}
123
124
try
:
125
response = requests.delete(
f'http://api.vvchat.im/groupapi/v1/users/
{open_id_qj}
/groups/
{groups_id}
/members'
, headers=headers,json=data)
126
result = response.json()
127
128
if
result[
'err_msg'
] ==
'OK'
:
129
print(
f"[巡查] 群
{group_name}
成员
{from_cust_id}
剔除成功!"
)
130
else
:
131
print(
f"[巡查] 群
{group_name}
成员
{from_cust_id}
剔除失败,原因:
{result[
'err_msg'
]}
请检查..."
)
132
133
except
requests.exceptions.RequestException
as
e:
134
print(
'[巡查] 群 %s 成员 %s 剔除出错:%s,重试一次...'
%(group_name,from_cust_id,e))
135
delete_member(from_cust_id,groups_id)
136
137
except
Exception
as
e:
138
print(
'[巡查] 群 %s 成员 %s 剔除异常报错:%s,重试一次...'
%(group_name,from_cust_id,e))
139
delete_member(from_cust_id,groups_id)
140
141
142
def
groups_name
(groups_id)
:
143
headers = {
"App_id"
:
"qiyunxin"
,
"User-Agent"
:
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36"
,
"Authorization"
:token}
144
145
try
:
146
response = requests.get(
f'http://api.vvchat.im/groupapi/v1/users/
{open_id_qj}
/groups/
{groups_id}
'
, headers=headers)
147
result = response.json()
148
149
if
result:
150
return
result[
'group_name'
]
151
152
except
requests.exceptions.RequestException
as
e:
153
print(
'[巡查] 获取群名称 出错:%s'
%(e))
154
groups_name(groups_id)
155
except
Exception
as
e:
156
print(
'[巡查] 获取群名称 异常报错:%s'
%(e))
157
groups_name(groups_id)
158
159
160
#登录包
161
def
wss_login_data
(from_cust_id,im_token)
:
162
from_cust_id = split_cust_id(int(decryptByDES(from_cust_id)))
163
auth = split_auth(im_token)
164
msg_no = next_message_num()
165
buffer = []
166
msg_type = [
1
,
0
]
167
body = []
168
body += login_type + from_cust_id + auth
169
checkCode = calc_checksum(body, len(body))
170
body += checkCode + end
171
buffer += start + length + version + msg_no + msg_type + body
172
len_body = len(body)
173
174
buffer[
1
] = len_body - ((len_body >>
16
) <<
16
)
175
buffer[
2
] = len_body >>
8
176
buffer[
3
] = len_body >>
16
177
buffer[
4
] =
0
178
179
return
buffer
180
181
#心跳包
182
def
wss_xt_data
(open_id)
:
183
buffer = []
184
from_cust_id = split_cust_id(int(decryptByDES(open_id)))
185
msg_no = next_message_num()
186
msg_type = [
6
,
0
]
187
body = []
188
body += login_type + from_cust_id
189
checkCode = calc_checksum(body, len(body))
190
body += checkCode + end
191
buffer += start + length + version + msg_no + msg_type + body
192
len_body = len(body)
193
194
buffer[
1
] = len_body - ((len_body >>
16
) <<
16
);
195
buffer[
2
] = len_body >>
8
;
196
buffer[
3
] = len_body >>
16
;
197
buffer[
4
] =
0
;
198
199
return
buffer
200
201
#消息检测
202
def
check_and_clean_data
()
:
203
global
msg_queue
204
wg_list = []
205
msg_list = []
206
207
while
True
:
208
if
not
msg_queue.empty():
209
for
_
in
range(
4
):
210
item = msg_queue.get()
211
msg_list.append(item)
212
213
print(
"[巡查] 巡查检测中..."
)
214
matching_data = [item
for
item
in
msg_list
if
item[
0
] == msg_list[
-1
][
0
]
and
item[
1
] == msg_list[
-1
][
1
]
and
item[
3
] == msg_list[
-1
][
3
]]
215
216
if
len(matching_data) >=
3
:
# 三者相同的存在大于等于三条
217
218
timestamps = [item[
4
]
for
item
in
matching_data]
#获取时间字段
219
time_diffs = [int(timestamps[i +
1
]) - int(timestamps[i])
for
i
in
range(len(timestamps) -
1
)]
220
221
for
diff, data
in
zip(time_diffs, matching_data[
1
:]):
222
if
diff ==
0
or
diff <=
3
:
223
for
x
in
msg_list:
224
if
data[
0
] == x[
0
]:
225
wg_list.append(x)
226
227
matching_data = []
228
229
if
wg_list:
230
# 使用集合去重,通过将子列表转换为元组
231
unique_list = list(set(tuple(sub_list)
for
sub_list
in
wg_list))
232
233
# 将元组转换回列表
234
unique_nested_list = [list(sub_tuple)
for
sub_tuple
in
unique_list]
235
236
from_cust_ids = set()
# 用于存储去重后的 from_cust_id
237
for
group
in
unique_nested_list:
238
if
group:
# 确保列表不为空
239
from_cust_ids.add((group[
0
], group[
1
]))
240
241
for
user
in
from_cust_ids:
242
#踢人
243
print(
f"[巡查] 检测到群:
{groups_name(user[
1
])}
出现违规者:
{user[
0
]}
!"
)
244
delete_member(x[
0
],x[
1
])
245
246
for
msg_no
in
msg_list:
247
for
i
in
from_cust_ids:
248
if
msg_no[
0
] == i[
0
]
and
msg_no[
1
] == i[
1
]:
249
#获取群名称
250
group_name = groups_name(msg_no[
1
])
251
#撤回
252
revoke_msg(msg_no[
2
],msg_no[
1
],group_name)
253
254
wg_list = []
255
msg_list = []
256
257
else
:
258
print(
"[巡查] 消息未检测到频繁!"
)
259
260
261
#服务端传输数据类型判断
262
def
eventReceiveMessage
(ws,msg)
:
263
global
msg_queue
264
msg_type = parse_number_data(msg[
14
:
16
])
265
status = parse_number_data(msg[
25
:
29
])
266
267
if
msg_type ==
2
:
268
if
status ==
200
:
269
print(
'[信息] websocket 端登录成功,开始监听信息...'
)
270
# 创建发送心跳包的线程
271
heartbeat_thread = threading.Thread(target=heartbeat, args=(ws,))
272
heartbeat_thread.daemon =
True
273
heartbeat_thread.start()
274
# 创建消息处理线程
275
msg_thread = threading.Thread(target=check_and_clean_data)
276
msg_thread.daemon =
True
277
msg_thread.start()
278
279
elif
status ==
401
:
280
print(
'[信息] websocket 端登录失败!'
)
281
ws.close()
282
283
elif
msg_type ==
5
:
284
chatType = parse_number_data(msg[
16
:
17
])
285
content = parse_string(msg[
61
:
61
+parse_number_data(msg[
57
:
61
])])
286
content_json = json.loads(content)[
'content'
]
287
msgNo = parse_number_data(msg[
6
:
14
])
288
groups_id = encryptCustId(parse_number_data(msg[
25
:
33
]))
289
msgTime = parse_number_data(msg[
17
:
25
])
290
from_cust_id = parse_number_data(msg[
33
:
41
])
291
292
data = [from_cust_id,groups_id,msgNo,content_json,msgTime]
293
294
if
chatType ==
2
:
295
print(
f"[消息] 发送者:
{from_cust_id}
发自群:
{groups_name(groups_id)}
发送时间:
{time.strftime(
'%Y-%m-%d %H:%M:%S'
, time.localtime(int(msgTime)))}
"
)
296
msg_queue.put(data)
297
298
elif
msg_type ==
6
:
299
print(
"[信息] websocket 端心跳正常!"
)
300
301
else
:
302
pass
303
304
def
heartbeat
(ws)
:
305
while
True
:
306
try
:
307
print(
f'[信息] 发送心跳推送...'
)
308
ws.send(bytes(wss_xt_data(open_id_qj)))
309
time.sleep(
15
)
310
except
:
311
print(
"[信息] 心跳推送失败!"
)
312
ws.close()
313
314
315
def
on_error
(ws, error)
:
316
print(
f"[错误]
{error}
"
)
317
318
def
on_close
(ws, close_status_code, close_msg)
:
319
print(
"[信息] websocket 端连接已断开,开始重新启动..."
)
320
run()
321
322
def
on_message
(ws, message)
:
323
eventReceiveMessage(ws,message)
324
325
def
on_open
(ws)
:
326
print(
"[信息] websocket 端已连接!"
)
327
login(username,password)
328
print(
"[信息] websocket 端登录中..."
)
329
ws.send(bytes(wss_login_data(open_id_qj,im_token)))
330
331
def
run
()
:
332
try
:
333
# 创建WebSocket连接
334
print(
"[信息] websocket 端连接状态检测..."
)
335
ws = websocket.WebSocketApp(url, on_message=on_message, on_open=on_open,on_error=on_error,on_close=on_close)
336
337
# 启动WebSocket连接
338
ws.run_forever()
339
except
:
340
print(
"[信息] websocket 端连接启动失败!"
)
341
342
if
__name__ ==
"__main__"
:
343
global
username,password
344
345
###############配置专区###############
346
username =
'0086'
#0086+手机号
347
password =
''
348
url =
"wss://im.mlskm.com/ws8"
349
###############配置专区###############
350
351
#全局变量定义
352
msg_queue = queue.Queue()
353
start = [
2
]
354
version = [
1
]
355
end = [
3
]
356
length = [
0
,
0
,
0
,
0
]
357
login_type = [
2
]
358
359
#启动
360
run()
截图:
最终实验效果如上,不会遗漏消息,并且支持多个群管理和支持全自动化(掉线重连),只需要挂着脚本啥也不需要管,就非常OK,搞定这一瞬间感觉整个人都活了,总花费差不多两周,太不容易了。。。
总结
入手学习到了逆向脱壳,以及 app hook 方面的知识。
对安卓中的Java代码更加熟悉。
入手了 sqlite 数据库及 sqlcipher 加密方式方面的知识。
入手了socket、websocket方面的知识。
对python更加熟练。
虽然整个过程非常痛苦,每天身体都是熬过夜的感觉,2个太阳穴也是非常的痛,不过结局还是好的,至少没白折腾,过程中也学习到了许多新知识点。
不过我还是希望下次我不会这么掘不会有这么大的研究心理了,不服老不行了,老了老了,还是保命要紧!
最后的最后,不得不吐槽一句,什么垃圾聊天软件,连防炸群的功能都舍不得开发一下!
END
![【实例剖析】记从逆向脱壳APP到写防炸群辅助工具 【实例剖析】记从逆向脱壳APP到写防炸群辅助工具]()
文:Mr.Wu
原文链接:https://mrwu.red/web/4198.html
版权声明:著作权归作者所有。如有侵权请联系删除
原文始发于微信公众号(开源聚合网络空间安全研究院):
【实例剖析】记从逆向脱壳APP到写防炸群辅助工具
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论