webshell检测算法实践

admin 2023年6月22日16:06:32评论11 views字数 18079阅读60分15秒阅读模式

本文首发于【i春秋论坛】

https://bbs.ichunqiu.com/thread-63425-1-1.html

分享你的技术,为安全加点温度~

  +

前言

在去年年末,笔者参与了阿里云WEBSHELL文本检测算法赛,于最后取得了第八名的成绩;本篇记录了笔者如何从 0开始学习相关知识并结合自身理解将检测算法的精度慢慢提升的过程。

PHP样本方案

本次题目训练数据给的是php和jsp样本的AST指令序列,如下的样式:

webshell检测算法实践
同时还附带了一些关于AST和污点传递的学习资料,感觉其预期想法是希望选手自己实现一套污点检测算法来做样本检测,但笔者对纯机器学习的检测方式较为感兴趣,一来是想借此学习下相关知识,二来是个人感觉处理ast相比来说研究成本更高;那么面对给到的ast数据我们如何来处理呢?
在处理之前笔者阅读了一下网上常见的一些关于机器学习检测webshell的文章和方案,找到了一个开源项目https://github.com/hooog/webshellDc ,首先笔者拿这个项目没做任何改动,将json数据不做处理直接作为了训练数据使用MLP进行训练,本地预留的测试数据基本准确率和召回率都在 95 以上,但在远程跑模型时,却拿到了下面的成绩。
"php_precision_score": 0.6631, "php_recall_score": 0.96195
可见准确率低的可怜,召回率很高,这说明我们找出的黑样本中掺杂了较多的白样本,因此造成了精确度下降,笔者觉得一方面是由于json中有很多无用的字符串会影响训练效果,另一方面题目远程的样本中存在着一部分变异的webshell导致模型无法准确识别,因此开始尝试对json进行特征提取和处理,首先对json进行数据清洗,去除一些无用的数据并格式化。
def read_file(filename):
    text = b""
    with open(filename, "rb"as f:
        for line in f:
            line = line.strip(b"rt")
            text += line
        result = re.compile('"name":.*?]', re.S)  # 正则匹配
        theresult = re.findall(result, str(text))
        text = ''.join(theresult)
        text = text.replace('['' ')
        text = text.replace(']'' ')
        text = text.replace('\'' ')
        text = text.replace(','' ')
        text = text.replace('"'' ')
        text = text.replace('name :'' ')
        text = re.sub('<(.*?)>','',text)
        text = re.sub('FROM `(.*?)`','',text)
        text = re.sub('INSERT INTO `(.*?))','',text)
        text = re.sub('UPDATE `(.*?)WHERE','',text)
        text = re.sub('REPLACE INTO `(.*?))','',text)
        text = re.sub('`(.*?)`=','',text)
        text = re.sub('SELECT `(.*?)`','',text)
        text = text.replace(':'' ')
        text = text.replace('STMT_LIST'' ')
        text = text.replace('ZVAL'' ')
        text = text.replace('NULL'' ')
        text = ' '.join(text.split()).encode("utf-8")
    return text
通过不断测试和筛选,选取了以下 13 种特征。
BINARY_OP 字符数量
key_num 敏感函数数量
passw 关键字数量
INCLUDE_OR_EVAL 关键字数量
FUNC_DECL 关键字数量
` 反引号数量
 eval  关键字数量
shell 关键字数量
capital_f 大写字母频率(用来辅助检测base64类型的字符串)
ARG_LIST 关键字数量
 CALL 关键字数量
文本总长度
文本最长字符串长度
其中敏感函数包含以下这些:
str_rot13,serialize,eval,base64_decode,strrev,assert,file_put_contents,fwrite,curl_exec,passthru,exec,dl,readlink,popepassthru,preg_replace,create_function,array_map,call_user_func,array_filter,usort,stream_socket_server,pcntl_exec,system,chroot,scandir,chgrp,shell_exec,proc_open,proc_get_status,popen,ini_alter,ini_restore,ini_set,LD_PRELOAD,_GET,_POST,_COOKIE,_FILE,phpinfo,_SERVER
训练使用StackingClassifier集成五种基本类器(AdaBoost、GradientBoosting、RandomForest、DecisionTree、XGB)进行训练。
训练代码如下:
# -*- coding: UTF-8 -*-
import re
import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.ensemble import AdaBoostClassifier,VotingClassifier,StackingClassifier
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
# from mlxtend.classifier import StackingClassifier
from xgboost import XGBClassifier
from sklearn.linear_model import LinearRegression, LogisticRegression

import joblib

# traincsv = "/mnt/data/train/train.csv"
# tmpcsv = "/tmp/3space.csv"
# model_path = "/tmp/"
# train_shell_path = "/mnt/data/train/"
traincsv = "./train.csv"
tmpcsv = "./data/3space.csv"
model_path = "./model/"
train_shell_path = "./train/"
features_len = 0
f_input=open(tmpcsv, 'w+')

def predict_one(model,data,target):
    pred = model.predict(data)
    print(pred)
    print(target)
    pass
def create_models():
    models = [
        AdaBoostClassifier(base_estimator=DecisionTreeClassifier(max_depth=26), n_estimators=101),
        GradientBoostingClassifier(n_estimators=101),
        RandomForestClassifier(n_estimators=101),
        XGBClassifier(),
        DecisionTreeClassifier(max_depth=26)
    ]
    estimators = []
    i = 1
    for m in models:
        estimators.append(((str(i)), m))
        i += 1

    stack = StackingClassifier(estimators)

    return stack

def read_file(filename):
    text = b""
    with open(filename, "rb"as f:
        for line in f:
            line = line.strip(b"rt")
            text += line
        result = re.compile('"name":.*?]', re.S)  # 正则匹配
        theresult = re.findall(result, str(text))
        text = ''.join(theresult)
        text = text.replace('['' ')
        text = text.replace(']'' ')
        text = text.replace('\'' ')
        text = text.replace(','' ')
        text = text.replace('"'' ')
        text = text.replace('name :'' ')
        text = re.sub('<(.*?)>','',text)
        text = re.sub('FROM `(.*?)`','',text)
        text = re.sub('INSERT INTO `(.*?))','',text)
        text = re.sub('UPDATE `(.*?)WHERE','',text)
        text = re.sub('REPLACE INTO `(.*?))','',text)
        text = re.sub('`(.*?)`=','',text)
        text = re.sub('SELECT `(.*?)`','',text)
        #INSERT INTO `
        text = text.replace(':'' ')
        text = text.replace('STMT_LIST'' ')
        text = text.replace('ZVAL'' ')
        text = text.replace('NULL'' ')
        # text = text.encode("utf-8")
        text = ' '.join(text.split()).encode("utf-8")
    return text

def get_features(data,line):
    key_num=0
    capital_len=0
    namespace_c = 0
    op_c = 0
    class_c = 0
    passw_c = 0
    include_c = 0
    FUNC_DECL_c = 0
    unquote_c = 0
    eval_c = 0 
    shell_c = 0
    hack_c = 0
    backdoor_c = 0
    arg_c = 0 
    post_c = 0
    file_c = 0
    get_c = 0 
    b64dec_c = 0
    flate_c = 0 
    iua_c = 0 
    stl_c = 0 
    smqr_c = 0
    muf_c = 0
    sys_c = 0
    curl_c = 0
    funexit_c = 0
    call_c = 0
    oppoint_c = 0

    key_num=data.count('str_rot13')+data.count('serialize')+data.count('eval')+data.count('base64_decode')+data.count('strrev')
    +data.count('assert')+data.count('file_put_contents')+data.count('fwrite')+data.count('curl_exec')+data.count('passthru')+data.count('exec')
    +data.count('dl')+data.count('readlink')+data.count('popepassthru')+data.count('preg_replace')+data.count('create_function')+data.count('array_map')
    +data.count('call_user_func')+data.count('array_filter')+data.count('usort')+data.count('stream_socket_server')+data.count('pcntl_exec')+data.count('system')
    +data.count('chroot')+data.count('scandir')+data.count('chgrp')+data.count('shell_exec')+data.count('proc_open')+data.count('proc_get_status')
    +data.count('popen')+data.count('ini_alter')+data.count('ini_restore')+data.count('ini_set')+data.count('LD_PRELOAD')+data.count('_GET')+data.count('_POST')+data.count('_COOKIE')
    +data.count('_FILE')+data.count("phpinfo")+data.count("_SERVER")
    namespace_c = data.count("NAMESPACE")
    op_c = data.count("BINARY_OP")
    class_c = data.count("CLASS")
    passw_c = data.count("passw")
    #ASSIGN_REF
    include_c = data.count("INCLUDE_OR_EVAL")
    FUNC_DECL_c = data.count("FUNC_DECL")
    unquote_c = data.count("`")
    # eval_c = data.count("INCLUDE_OR_EVAL(eval)")
    eval_c = data.count(" eval ")
    shell_c = data.count("shell")
    hack_c = data.count("hack")
    backdoor_c = data.count("backdoor")
    capital_len=len(re.compile(r'[0-9]').findall(data))

    capital_f=capital_len/len(data)#大写字母频率
    post_c = data.count("_POST")
    file_c = data.count("_FILE")
    get_c = data.count("_GET")
    b64dec_c = data.count("base64_decode")
    flate_c =data.count("flate")
    iua_c = data.count("ignore_user_abort")
    stl_c = data.count("set_time_limit")
    smqr_c = data.count("set_magic_quotes_runtime")
    muf_c = data.count("move_uploaded_file")
    sys_c = data.count(" system ")
    curl_c = data.count(" curl_exec ")
    arg_c = data.count("ARG_LIST")
    content_list = re.split(r' ',data)
    max_length = 0
    for i in content_list:
        if len(i) > max_length:
            max_length = len(i)
        else:
            pass
    funexit_c = data.count(" function_exists ")
    call_c = data.count(" CALL ")
    oppoint_c = data.count("(.)")
    temp = data.count("passw")
    if line[2] == "black":
        #测试输出用
        # print(temp)
        pass
    else:
        # print(temp)
        pass

    autowrite(
        len(data),
        # namespace_c,
        # class_c,
        op_c,
        arg_c,
        capital_f,
        key_num,
        passw_c,
        include_c,
        FUNC_DECL_c,
        max_length,
        unquote_c,
        eval_c,
        shell_c,
        # hack_c,
        # backdoor_c,
        call_c,
        line=line
    )

def autowrite(*features,line):
    global tmpcsv
    global f_input
    global features_len
    if line[2] == "black":
        label = 1.0
    else:
        label = 0.0
    features_len = len(features)
    wtf = "%f"+",%f"*len(features)
    features = features + (label, )
    f_input.write(wtf % features+'n')

def generate():
    global traincsv
    global train_shell_path
    head_row = pd.read_csv(traincsv, nrows=0)
    head_row_list = list(head_row)
    csv_result = pd.read_csv(traincsv, usecols=head_row_list)
    row_list = csv_result.values.tolist()
    for line in row_list:
        if line[1] == "jsp":
            continue
        else:
            data = read_file(train_shell_path+str(line[0])).decode('utf-8')

            if(len(data)>0):
                get_features(data,line)
                
generate()
feature_max = pd.read_csv(tmpcsv)
arr=feature_max.values
data = np.delete(arr, -1, axis=1#删除最后一列

target=arr[:,len(arr[0])-1]
#随机划分训练集和测试集
train_data,test_data,train_target,test_target = train_test_split(data,target,test_size=0.3,random_state=10)
#模型
model1=DecisionTreeClassifier(max_depth=5)
model2=GradientBoostingClassifier(n_estimators=100)
model3=AdaBoostClassifier(model1,n_estimators=100)
model4 = create_models()
# model4 = test()
model1.fit(train_data,train_target)#训练模型
model2.fit(train_data,train_target)#训练模型
model3.fit(train_data,train_target)#训练模型
model4.fit(train_data,train_target)

joblib.dump(model4, model_path+'all.model')
print("all.model has been saved to '/tmp/all.model'")
y_pred2=model4.predict(test_data)#预测
print("y_pred:%s"%y_pred2)
print("test_target:%s"%test_target)
#Verify
print('Precision:%.3f' %metrics.precision_score(y_true=test_target,y_pred=y_pred2))#查全率
print('Recall:%.3f' %metrics.recall_score(y_true=test_target,y_pred=y_pred2))#查准率
print(metrics.confusion_matrix(y_true=test_target,y_pred=y_pred2))#混淆矩阵

print("features_len:"+str(features_len))
远程加载模型的预测代码如下:
# -*- coding: UTF-8 -*-
import re
import pandas as pd
import numpy as np
import joblib

jsp_list = []
f_input = ""

features_len = 13
def read_file(filename):
    text = b""
    with open(filename, "rb"as f:
        for line in f:
            line = line.strip(b"rt")
            text += line
        result = re.compile('"name":.*?]', re.S)  # 正则匹配
        # val": 
        # result = re.compile('val":"(.*?)"', re.S)
        theresult = re.findall(result, str(text))
        text = ''.join(theresult)
        text = text.replace('['' ')
        text = text.replace(']'' ')
        text = text.replace('\'' ')
        text = text.replace(','' ')
        text = text.replace('"'' ')
        text = text.replace('name :'' ')
        text = re.sub('<(.*?)>','',text)
        text = re.sub('FROM `(.*?)`','',text)
        text = re.sub('INSERT INTO `(.*?))','',text)
        text = re.sub('UPDATE `(.*?)WHERE','',text)
        text = re.sub('REPLACE INTO `(.*?))','',text)
        text = re.sub('`(.*?)`=','',text)
        text = re.sub('SELECT `(.*?)`','',text)
        #INSERT INTO `
        text = text.replace(':'' ')
        text = text.replace('STMT_LIST'' ')
        text = text.replace('ZVAL'' ')
        text = text.replace('NULL'' ')
        # text = text.encode("utf-8")
        text = ' '.join(text.split()).encode("utf-8")
    return text

def autowrite(*features,line):
    global tmpcsv
    global f_input

    wtf = "%f"+",%f"*len(features)
    features = features + (line[0], )
    f_input.write(wtf % features+'n')

def get_features(data,line):
    key_num=0
    capital_len=0
    namespace_c = 0
    op_c = 0
    class_c = 0
    passw_c = 0
    include_c = 0
    FUNC_DECL_c = 0
    unquote_c = 0
    eval_c = 0
    shell_c = 0
    hack_c = 0
    backdoor_c = 0
    arg_c = 0 
    post_c = 0
    file_c = 0
    get_c = 0 
    b64dec_c = 0
    flate_c = 0 
    iua_c = 0
    stl_c = 0 
    smqr_c = 0 
    muf_c = 0 
    sys_c = 0 
    curl_c = 0
    funexit_c = 0
    call_c = 0 
    oppoint_c = 0

    key_num=data.count('str_rot13')+data.count('serialize')+data.count('eval')+data.count('base64_decode')+data.count('strrev')
    +data.count('assert')+data.count('file_put_contents')+data.count('fwrite')+data.count('curl_exec')+data.count('passthru')+data.count('exec')
    +data.count('dl')+data.count('readlink')+data.count('popepassthru')+data.count('preg_replace')+data.count('create_function')+data.count('array_map')
    +data.count('call_user_func')+data.count('array_filter')+data.count('usort')+data.count('stream_socket_server')+data.count('pcntl_exec')+data.count('system')
    +data.count('chroot')+data.count('scandir')+data.count('chgrp')+data.count('shell_exec')+data.count('proc_open')+data.count('proc_get_status')
    +data.count('popen')+data.count('ini_alter')+data.count('ini_restore')+data.count('ini_set')+data.count('LD_PRELOAD')+data.count('_GET')+data.count('_POST')+data.count('_COOKIE')
    +data.count('_FILE')+data.count("phpinfo")+data.count("_SERVER")
    namespace_c = data.count("NAMESPACE")
    op_c = data.count("BINARY_OP")
    class_c = data.count("CLASS")
    passw_c = data.count("passw")
    #ASSIGN_REF
    include_c = data.count("INCLUDE_OR_EVAL")
    FUNC_DECL_c = data.count("FUNC_DECL")
    unquote_c = data.count("`")
    eval_c = data.count(" eval ")
    shell_c = data.count("shell")
    hack_c = data.count("hack")
    backdoor_c = data.count("backdoor")
    capital_len=len(re.compile(r'[0-9]').findall(data))

    capital_f=capital_len/len(data)#大写字母频率
    post_c = data.count("_POST")
    file_c = data.count("_FILE")
    get_c = data.count("_GET")
    b64dec_c = data.count("base64_decode")
    flate_c =data.count("flate")
    iua_c = data.count("ignore_user_abort")
    stl_c = data.count("set_time_limit")
    smqr_c = data.count("set_magic_quotes_runtime")
    muf_c = data.count("move_uploaded_file")
    sys_c = data.count(" system ")
    curl_c = data.count(" curl_exec ")
    arg_c = data.count("ARG_LIST")
    content_list = re.split(r' ',data)
    max_length = 0
    for i in content_list:
        if len(i) > max_length:
            max_length = len(i)
        else:
            pass
    funexit_c = data.count(" function_exists ")
    call_c = data.count(" CALL ")
    oppoint_c = data.count("(.)")
    temp = data.count("passw")

    
    autowrite(
        len(data),
        # namespace_c,
        # class_c,
        op_c,
        arg_c,
        capital_f,
        key_num,
        passw_c,
        include_c,
        FUNC_DECL_c,
        max_length,
        unquote_c,
        eval_c,
        shell_c,
        # hack_c,
        # backdoor_c,
        call_c,
        line=line
    )

def generate():
    global f_input
    global features_len
    feat = "0.000000,"*features_len
    f_input=open("./data/temp.csv"'w')
    f_input.write(feat+"1.000000n")
    # head_row = pd.read_csv('/tcdata/test.csv', nrows=0)
    head_row = pd.read_csv('/tmp/temp.csv', nrows=0)
    head_row_list = list(head_row)
    # csv_result = pd.read_csv('/tcdata/test.csv', usecols=head_row_list)
    csv_result = pd.read_csv('/tmp/temp.csv', usecols=head_row_list)
    row_list = csv_result.values.tolist()
    for line in row_list:
        if line[1] == "white":
            
            f_input.write(feat+"%fn"% line[0])
        else:
            data = read_file('/tcdata/test/'+str(line[0])).decode('utf-8')
            if(len(data)>0):
                get_features(data,line)
            else:
                f_input.write(feat+"%fn"% line[0])


generate()
# -*- coding: UTF-8 -*-
import pandas as pd
import numpy as np
import joblib

features_len = 13

clf = joblib.load('./model/all.model')
feature_max = pd.read_csv('./data/temp.csv')
arr=feature_max.values
test_data = np.delete(arr, -1, axis=1#删除最后一列
id=arr[:,features_len]

y_pred=clf.predict(test_data)
res_list = []
for i in range(0,len(id)):
    tmplist = {}
    if y_pred[i] == 0:
        tmplist["prediction"] = "white"
    else:
        tmplist["prediction"] = "black"
    tmplist["file_id"] = int(id[i])
    res_list.append(tmplist)
df = pd.DataFrame(res_list, columns=['file_id''prediction'])
df.to_csv("/tmp/res_php_1.csv", index=False)
使用上述方案后得到如下成绩:
"php_precision_score": 0.82581, "php_recall_score": 0.83657
可以看到相对于不进行特征提取来说,其准确率有较大提升,但召回率却降低了;在这里笔者尝试将不处理特征和处理特征的流程相结合,首先利用不处理任何特征的高召回率筛选出一批标记为黑的结果,然后将这些标记为黑的样本借助处理json特征后的模型的高精确率进行第二轮筛选,将这一轮筛选出的黑样本标记为黑,剩余样本标记为白,得到最终的预测结果。经过上面的实验,远程的预测成绩如下:
"php_precision_score": 0.86867, "php_recall_score": 0.89839

JSP样本方案

对于jsp样本来说同样还是首先不对AST的json进行任何处理,使用xgb分类器直接使用开源项目中的代码进行训练和预测,远程的预测成绩如下:
"jsp_precision_score": 0.97611, "jsp_recall_score": 0.76334
与php的结果恰恰相反,其准确率很高,召回率相对来说低了一些,结合预测成绩以及对jsp样本的了解,远程的环境中应该存在着一部分变异样本,但在ast层面上其实都是一样的,无非会调用到命令执行、类加载、写文件等代码,于是我尝试了下提取了一些关键字,如下:
['java.lang.Process','getRuntime','webshell','Cmd','password','IDENTIFIER:defineClass','IDENTIFIER:processCmd','IDENTIFIER:MethodWebHell','IDENTIFIER:webhell','IDENTIFIER:URLClassLoader','IDENTIFIER:ReflectInvoker','IDENTIFIER:MyClassLoader']
在xgb模型预测完后,将其预测的白样本进行关键字判断,当关键字数量大于 0 时,将其手动标记为黑样本,增加了一层关键字判断后,远程成绩如下:
"jsp_precision_score": 0.95642, "jsp_recall_score": 0.89488
可以看到召回率有了明显的提升。
最后将 PHP和JSP的预测模型结合后,最终成绩如下:
{"score": 0.8883, "php_precision_score": 0.86867, "php_recall_score":  0.89839, "jsp_precision_score": 0.95642, "jsp_recall_score": 0.89488}

最后贴一下整体的模型流程图。

webshell检测算法实践

后续改进点

上述方案个人感觉有很大的改进空间,首先在训练模型时会发现每次训练时划分出训练集和测试集后,有时的预测成绩差别较大,最大的成绩F1 Score差距大概在2分左右,个人感觉貌似是训练样本中存在着一些会干扰训练效果的样本,由于笔者水平有限和竞赛时间及测评次数的原因,没有搞明白这个问题;还有就是可以利用给出的带标记的ast训练样本做一套自己的污点分析算法,但个人觉得貌似工作量会很大,还有一点就是之前也没做过相关的尝试,一时感觉无从上手,有兴趣的师傅可以加联系方式交流下,微信wha1er
上述的方案完整代码已上传到github,地址如下:
https://github.com/yemoli/AMWD_2022_WEBSHELL

参考链接

https://www.freebuf.com/articles/web/254913.html

https://github.com/hooog/webshellDc

+ + + + + + + + + + + 
春秋GAME最新推出了一个赛题创作平台——星汉平台,欢迎有意向的CTFer通过星汉平台提交您的研究实践,有可能通过我们让您的研究成果被更多人看到。
https://racest.ichunqiu.com
如在使用过程有任何问题可联系vx:cium0309


关于伽玛实验室

     伽玛实验室(GAMELAB)聚焦网络安全竞赛研究领域,覆盖网络安全赛事开发、技术研究、赛制设计、赛题研发等方向。秉承“万物皆可赛”的信念,研究内容涉及WEB渗透、密码学、二进制、AI、自动化利用、工控等多个重点方向,并将5G、大数据、区块链等新型技术与网安竞赛进行融合,检验新型技术应用安全性的同时,训练网安人员的实战能力。同时,不断创新比赛形式,积极推动反作弊运动,维护网安竞赛健康长远发展。团队成员以95后为主,是支极具极客精神的年轻团队。

webshell检测算法实践


webshell检测算法实践

原文始发于微信公众号(春秋伽玛):webshell检测算法实践

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年6月22日16:06:32
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   webshell检测算法实践http://cn-sec.com/archives/1827695.html

发表评论

匿名网友 填写信息