webshell扫描器

参考《web安全之深度学习实战》这本书

前言

目前常用的木马病毒等的检测工具主要有两种:一种是静态检测,通过匹配一些特征码或者危险函数来识别webshell等,但是这种方式需要完善的匹配规则,而且只能识别已知的webshell。另一种是动态检测,通过检测文件执行时表现出来的特征,查看它是否是一个webshell的文件。
本程序采用机器学习算法的朴素贝叶斯算法和深度学习的MLP算法,实现两种对webshell的检测方式,结合两种算法的预测结果,对一个文件是否是webshell进行判断。
使用到了scikit-learn,安装教程https://blog.csdn.net/weixin_42886817/article/details/102716587

原理

TF-IDF

TF是词频,表示一个词在一个文档中出现的频率,它的计算公式是:
\(TF=\frac{\text {该词在该文档中出现的次数}}{\text {文档中所有词的次数和}}\)
IDF是逆文档率,表示一个词在所有文档中出现的频率,它的计算公式是:
\(IDF=\log \frac{\text {文档总数}}{1+\text {包含该词的所有文档数}}\)
TF-IDF实际就是TF×IDF,表示一个词对与文档集中一个文件的重要程度。主要思想是,如果一个词对一个文件重要,那么它在这个文件出现的频率高,并且在其它文件出现的频率低。那么这个词就可以代表这个文档的特征,可以用来分类。

Ngram

Ngram是一个基于概率的判别模型,它输入一句话,即单词的顺序,输出是这句话的概率,即这些单词的联合概率。而该模型基于这样一种假设,第N个词的出现只与前面N-1个词相关,而与其它任何词都不相关,整句的概率就是各个词出现概率的乘积。

朴素贝叶斯算法

https://www.cnblogs.com/Qi-Lin/p/12274001.html

MLP

MLP是一种神经网络,它的层与层之间的神经元是全连接的。激活函数一种是Sigmoid函数。公式如下:
\(s(x)=\frac{1}{1+e^{-x}}\)

webshell扫描器

训练过程和预测过程其实本质如下:

webshell扫描器

webshell扫描器

Opcode

opcode为中间代码,当解释器完成对脚本代码的分析后,便将它们生成可以直接运行的中间代码。对于php文件执行时,通常流程为虚拟机从文件系统读取php文件、扫描其词典和表达式、解析文件、创建要执行的opcode,最后执行opcode。
项目设计之前首先要配置好php的vld扩展相关环境。
vld的安装:http://www.asarea.cn/diary/265
vld下载地址:https://windows.php.net/
通过命令‘php.exe -dvld.active=1 -dvld.execute=0 文件名’进行分析,如下图所示,分析的opcode为:FETCH_R FETCH_DIM_R SEND_VAR DO_FCALL ECHO RETURN

webshell扫描器

在python中执行系统命令采用subprocess模块https://www.jianshu.com/p/430c411160f8

处理流程

贝叶斯模型

其处理流程为:对webshell文件和正常的php文件提取关键词,关键词的特征最大采用10000个。之后通过TF-IDF进行计算。然后将数据集分为训练集和测试集,使用朴素贝叶斯算法在训练集上进行训练,之后通过测试集在训练好的模型上进行测试,验证效果。

webshell扫描器

MLP模型

其处理流程为:对webshell文件和正常的php文件进行编译分析,得到其对应的opcode。Opcode的特征数最大采用10000个。之后通过NGram进行处理。然后将数据集分为训练集和测试集,使用MLP在训练集上进行训练,之后通过测试集在训练好的模型上进行测试,验证效果。

webshell扫描器

使用opcode和NGram处理时,设定的NGram参数为2,即每个opcode只考虑它与前后有关系。其过程如下示例:

webshell扫描器

MLP网络隐藏层使用两层,第一层五个神经元,第二层两个。示例如下:

webshell扫描器

扫描器设计

通过将训练好的模型进行保存,当进行扫描时,先对扫描文件进行提取关键词,进行TF-IDF处理后,用保存的朴素贝叶斯模型进行预测。之后再对扫描文件分析opcode,进行2gram的处理,用保存的MLP模型进行预测。最后将两次预测结果进行与操作,得到最准确的扫描结果,进行报警。其过程如下:

webshell扫描器

数据收集

对《web安全之深度学习实战》这本书所带的php文件,对它们进行初步处理,按照序号重命名,其中正常文件825个,webshell文件614个

分析数据

朴素贝叶斯模型

通过load_word函数加载文件内容,通过os.walk读取文件夹下所有文件,通过遍历,判断是否是php文件,如果是进行读取。在读取前首先使用chardet包的detect函数检测文件的编码方式,之后以该编码方式打开。读取后进行初步的处理,将换行符等字符替换为空格,将其加入到读取的内容列表中,列表中每一个元素代表一个文档的内容,同时函数返回读取的对应的文档名的列表。其函数流程如下:

webshell扫描器

通过B_get_feature函数得到用于朴素贝叶斯训练的特征。
该函数首先通过调用load_word函数得到了正常文件和webshell文件的内容,并且进行标记,webshell文件标记为1,正常文件标记为0。然后通过CountVectorizer的fit_transform函数提取词向量,并且最多只保留10000个特征,即10000个词,得到词频结果。得到的词频矩阵如图所示,如第一行表示,第0个文档的在词集中序号为1843的词出现了2次,之后可以通过toarray转换为矩阵形式。
CountVectorizer的方法参数说明https://blog.csdn.net/weixin_38278334/article/details/82320307
同时要保存相应的词集,以便应用模型时,对未知文件的特征的提取,词集包含10000个作为特征的词,以json格式保存。
最后对得到的矩阵进行TF-IDF的处理。通过TfidfTransformer的fit_transform得到各个文件的各个词的TF-IDF的值。结果如图所示,如第一行表示,第0个文件在词集中第9342个词的TF-IDF值为0.048694,之后可以通过toarray转换为矩阵形式。

webshell扫描器

MLP模型

通过get_opcode函数编译php文件,得到对应的opcode。编译环境我选择的为php5.6.27的版本。通过php扩展进行编译分析,命令为php.exe -dvld.active=1 -dvld.execute=0 文件名。执行命令采用的subprocess包的getstatusoutput函数,该函数返回两个值,一个为执行的状态,一个为返回的结果。但是返回的结果如图所示,包含很多无用的信息,我通过正则匹配进行处理提取。而opcode为带下划线的大写字母,所以匹配规则为:\s(\b[A-Z_]{2,}\b)\s,提取的opcode至少为两个字符。最后将opcode以空格拼接为一个序列写入对应的文件中。

webshell扫描器

通过MLP_get_feature函数得到用于MLP模型的特征。
该函数首先通过调用load_opcode函数得到了正常文件和webshell文件分析后的opcode,并且进行标记,webshell文件标记为1,正常文件标记为0。然后通过CountVectorizer的fit_transform函数提取词向量,并且最多只保留10000个特征,即10000个opcode,得到opcode词频结果,这些特征首先采用2Gram处理,即要考虑每个单独的opcode和前一个和后一个的关系,即统计时,统计前后两个opcode一起出现的概率。其余与朴素贝叶斯的结果大致相同。
最后要保存相应的词集,以便应用模型时,对未知文件的特征的提取,词集包含10000个作为特征的词,以json格式保存

webshell扫描器

训练和测试算法

两个模型都通过交叉验证进行验证,将数据集的40%分为测试集,60%用于训练。

朴素贝叶斯模型

验证结果,在预测为webshell的文件中,有329个为真webshell文件,4个为正常文件,而有21个webshell文件没有检测出来。

- webshell文件 正常文件
检索到 329 4
未检索到 21 219

准确率为:0.95
精确率为:0.98
召回率为:0.91

MLP模型

验证结果,在预测为webshell的文件中,有323个为真webshell文件,8个为正常文件,而有4个webshell文件没有检测出来。

- webshell文件 正常文件
检索到 323 8
未检索到 4 236

准确率为:0.98
精确率为:0.97
召回率为:0.98

使用模型

界面设计

界面采用pyqt拖拽设计,之后再自动生成对应的代码,设计如图所示

webshell扫描器

openFile函数为选择文件按钮的槽函数,通过点击按钮,显示选择文件窗口,选择文件后,将文件目录显示在条形编辑框区域。
打开文件或文件夹参考https://www.jianshu.com/p/98e8218b2309
train_btn函数为训练模型按钮的槽函数,通过点击按钮,为train函数创建新线程,进行模型的训练,防止界面卡死。
Train函数为训练模型的函数,该函数调用前文所述的一些列函数,进行训练,并且为了以后扫描的方便,在前文所述函数中,使用dump函数,保存已经训练好的模型,扫描预测时直接调用即可。
sklearn模型的保存https://www.cnblogs.com/zichun-zeng/p/4761602.html
dump使用需要导入from sklearn.externals import joblib但出现报错,直接import joblib导入
scan_btn函数为开始扫描按钮的槽函数,通过点击按钮,为scan函数创建新线程,进行模型的训练,防止界面卡死。
scan函数为扫描函数,通过读取条形编辑框区域的文件路径,遍历路径下的文件,同样使用两种模型,采用前文所述方法对文件进行处理,处理时读取保存的词集进行提取特征。之后读取保存的模型,对文件进行预测。不同的是预测结果结合两种模型的预测结果,通过与两种模型的预测结果给出最终的预测结果。将可能是webshell的文件显示出来。

扫描演示

在扫描时需要先训练模型,模型训练后会进行保存,如果模型已经训练完成,就可以直接进行扫描。扫描速度还是十分快的,主要影响扫描速度的因素是,在进行MLP预测时,需要对文件进行编译,分析opcode,占用了扫描的时间。

webshell扫描器

源代码

getShell.py

import os
import re
import subprocess
import json
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.naive_bayes import GaussianNB
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.neural_network import MLPClassifier
import joblib
import chardet
import time


# 获取文件的opcode
def get_opcode(path, rpath):
    for filepath, dirnames, filenames in os.walk(path):
        for filename in filenames:
            filetype = os.path.splitext(filename)[1]
            filetype = filetype.lower()
            if filetype == '.php':
                cmd = 'D:/phpstudy/PHPTutorial/php/php-5.6.27-nts/php.exe -dvld.active=1 -dvld.execute=0 ' + os.path.join(
                    filepath, filename)
                try:
                    status, output = subprocess.getstatusoutput(cmd)
                    #print(output)
                    if status == 0:
                        opcode_list = re.findall(r'\s(\b[A-Z_]{2,}\b)\s', output)
                        opcode = ' '.join(opcode_list)
                        pre = str(time.time()) + '.txt'
                        file = os.path.join(rpath, pre)
                        with open(file, 'w') as f2:
                            f2.write(str(opcode))
                    else:
                        print("执行失败" + filename)
                except:
                    print("执行失败" + filename)


# 对php文件进行处理
def get_shell(path, rpath, i):
    for filepath, dirnames, filenames in os.walk(path):
        for filename in filenames:
            i = i + 1
            filetype = os.path.splitext(filename)[1]
            filetype = filetype.lower()
            if filetype == '.php':
                file = os.path.join(filepath, filename)
                with open(file, 'r') as f:
                    result = f.read()
                pre = str(i) + '.php'
                file = os.path.join(rpath, pre)
                with open(file, 'w') as f2:
                    f2.write(result)
    return i


# 得到文件的内容
def load_word(path):
    result = []
    filelist = []
    for filepath, dirnames, filenames in os.walk(path):
        for filename in filenames:
            filetype = os.path.splitext(filename)[1]
            filetype = filetype.lower()
            if filetype == '.php':
                with open(os.path.join(filepath, filename), 'rb') as f:
                    r = f.read()
                    de = chardet.detect(r)['encoding']
                try:
                    result.append(r.decode(encoding=de).replace('\r', ' ').replace('\n', ' ').replace('\t', ' '))
                    filelist.append(os.path.join(filepath, filename))
                except:
                    print("失败" + filename)
    return result, filelist


# 得到opcode文件的opcode
def load_opcode(path):
    result = []
    filelist = []
    for filepath, dirnames, filenames in os.walk(path):
        for filename in filenames:
            with open(os.path.join(filepath, filename), 'r') as f:
                r = f.read()
            try:
                result.append(r)
                filelist.append(os.path.join(filepath, filename))
            except:
                print("失败" + filename)
    return result, filelist


# 得到用于MLP训练的特征
def MLP_get_feature():
    webshell, list1 = load_opcode(r'C:\Users\Desktop\php\webshellopt')
    y1 = [1] * len(webshell)
    normal, list2 = load_opcode(r'C:\Users\Desktop\php\normalopt')
    y2 = [0] * len(normal)
    x = webshell + normal
    y = y1 + y2
    CV = CountVectorizer(ngram_range=(2, 2), decode_error="ignore", max_features=10000, token_pattern=r'\b\w+\b',
                         min_df=1, max_df=1.0)
    #print(CV.fit_transform(x))
    x = CV.fit_transform(x).toarray()
    x_2 = CV.vocabulary_
    x_2 = json.dumps(x_2)
    with open(r'C:\Users\Desktop\php\x_2.json', 'w') as f:
        f.write(x_2)
    clf = MLPClassifier(solver='lbfgs', alpha=1e-5, hidden_layer_sizes=(5, 2), random_state=1)
    mo = clf.fit(x, y)
    joblib.dump(mo, 'xx.model')
    return x, y


# 得到用于贝叶斯训练的特征
def B_get_feature():
    webshell, list1 = load_word(r'C:\Users\Desktop\php\webshell')
    y1 = [1] * len(webshell)
    normal, list2 = load_word(r'C:\Users\Desktop\php\normal')
    y2 = [0] * len(normal)
    x_1 = webshell + normal
    y = y1 + y2
    CV = CountVectorizer(decode_error="ignore", max_features=10000, token_pattern=r'\b\w+\b', min_df=1, max_df=1.0)
    x = CV.fit_transform(x_1).toarray()
    #print(CV.fit_transform(x_1))
    #保存词集
    x_1 = CV.vocabulary_
    x_1 = json.dumps(x_1)
    with open(r'C:\Users\Desktop\php\x_1.json', 'w') as f:
        f.write(x_1)

    transformer = TfidfTransformer(smooth_idf=False)
    x_tfidf = transformer.fit_transform(x)
    print(x_tfidf)
    x = x_tfidf.toarray()
    gnb = GaussianNB()
    mo = gnb.fit(x, y)
    #保存模型
    joblib.dump(mo, 'x.model')
    return x, y


# 统计模型好坏的函数
def do_metrics(y_test, y_pred):
    print("分类准确率为:")
    print(metrics.accuracy_score(y_test, y_pred))
    print("混淆矩阵为:")
    print(metrics.confusion_matrix(y_test, y_pred))
    print("精确率为:")
    print(metrics.precision_score(y_test, y_pred))
    print("召回率为:")
    print(metrics.recall_score(y_test, y_pred))


# 进行贝叶斯预测
def do_nb(x, y):
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.4, random_state=0)
    gnb = GaussianNB()
    gnb.fit(x_train, y_train)
    y_pred = gnb.predict(x_test)
    do_metrics(y_test, y_pred)


# 进行MLP预测
def do_mlp(x, y):
    clf = MLPClassifier(solver='lbfgs', alpha=1e-5, hidden_layer_sizes=(5, 2), random_state=1)
    x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.4, random_state=0)
    clf.fit(x_train, y_train)
    y_pred = clf.predict(x_test)
    do_metrics(y_test, y_pred)


# 对收集的数据进行处理
def dealDate():
    # 处理数据集
    path = 'C:/Users/Desktop/data/webshell/w/'
    rpath = r'C:\Users\Guoyang\Desktop\网安设计\php\normal'
    i = 0
    i = get_shell(path, rpath, i)
    path = 'C:/Users/Desktop/data/webshell/normal/'
    i = get_shell(path, rpath, i)
    print('正常文件有' + str(i))

    path = 'C:/Users/Desktop/data/webshell/b/'
    rpath = r'C:\Users\Guoyang\Desktop\网安设计\php\webshell'
    i = 0
    i = get_shell(path, rpath, i)
    path = 'C:/Users/Desktop/data/webshell/webshell/'
    i = get_shell(path, rpath, i)
    print('木马文件有' + str(i))

# #得到opcode
# path = r'C:\Users\Desktop\php\normal'
# rpath = r'C:\Users\Desktop\php\normalopt1'
# get_opcode(path,rpath)
# path = r'C:\Users\Desktop\php\webshell'
# rpath = r'C:\Users\Desktop\php\webshellopt'
# get_opcode(path,rpath)

# #训练贝叶斯,预测
# x,y=B_get_feature()
# do_nb(x,y)
#
# 训练MLP,预测
x,y=MLP_get_feature()
do_mlp(x,y)

webshellScan.py

# -*- coding: utf-8 -*-

# Form implementation generated from reading ui file 'webshellScan.ui'
#
# Created by: PyQt5 UI code generator 5.13.2
#
# WARNING! All changes made in this file will be lost!


from PyQt5 import QtCore, QtGui, QtWidgets


class Ui_Form(object):
    def setupUi(self, Form):
        Form.setObjectName("Form")
        Form.resize(1000, 661)
        self.widget = QtWidgets.QWidget(Form)
        self.widget.setGeometry(QtCore.QRect(20, 40, 961, 591))
        self.widget.setObjectName("widget")
        self.verticalLayout = QtWidgets.QVBoxLayout(self.widget)
        self.verticalLayout.setContentsMargins(0, 0, 0, 0)
        self.verticalLayout.setObjectName("verticalLayout")
        self.horizontalLayout = QtWidgets.QHBoxLayout()
        self.horizontalLayout.setObjectName("horizontalLayout")
        self.lineEdit = QtWidgets.QLineEdit(self.widget)
        self.lineEdit.setObjectName("lineEdit")
        self.horizontalLayout.addWidget(self.lineEdit)
        self.pushButton_3 = QtWidgets.QPushButton(self.widget)
        self.pushButton_3.setObjectName("pushButton_3")
        self.horizontalLayout.addWidget(self.pushButton_3)
        self.pushButton = QtWidgets.QPushButton(self.widget)
        self.pushButton.setObjectName("pushButton")
        self.horizontalLayout.addWidget(self.pushButton)
        self.pushButton_2 = QtWidgets.QPushButton(self.widget)
        self.pushButton_2.setObjectName("pushButton_2")
        self.horizontalLayout.addWidget(self.pushButton_2)
        self.verticalLayout.addLayout(self.horizontalLayout)
        self.label = QtWidgets.QLabel(self.widget)
        self.label.setText("")
        self.label.setObjectName("label")
        self.verticalLayout.addWidget(self.label)
        self.textBrowser = QtWidgets.QTextBrowser(self.widget)
        self.textBrowser.setObjectName("textBrowser")
        self.verticalLayout.addWidget(self.textBrowser)

        self.retranslateUi(Form)
        QtCore.QMetaObject.connectSlotsByName(Form)

    def retranslateUi(self, Form):
        _translate = QtCore.QCoreApplication.translate
        Form.setWindowTitle(_translate("Form", "webshell扫描工具"))
        self.pushButton_3.setText(_translate("Form", "选择文件"))
        self.pushButton.setText(_translate("Form", "训练模型"))
        self.pushButton_2.setText(_translate("Form", "开始扫描"))

webshellScan_ui.py

from webshellScan import Ui_Form
from PyQt5 import QtWidgets
from PyQt5.QtWidgets import *
import sys
from getShell import *
import joblib
import threading


class myWin(QtWidgets.QWidget, Ui_Form):
    def __init__(self):
        super(myWin, self).__init__()
        self.setupUi(self)
        self.pushButton_3.clicked.connect(self.openFile)
        self.pushButton_2.clicked.connect(self.scan_btn)
        self.pushButton.clicked.connect(self.train_btn)

    def openFile(self):
        dir = QFileDialog.getExistingDirectory(self.widget, "打开目录", '')
        self.lineEdit.setText(dir)

    def train(self):
        self.label.setText('正在训练模型……')
        global x1_train, x2_train, y1_train, y2_train
        x1_train, y1_train = B_get_feature()
        x2_train, y2_train = MLP_get_feature()
        self.label.setText('训练完成!')
        # self.textBrowser.append('训练完成')

    def train_btn(self):
        t = threading.Thread(target=self.train)
        t.start()

    def scan(self):
        path = self.lineEdit.text()
        self.label.setText('加载词集……')
        # 加载词集
        with open(r'C:\Users\Guoyang\Desktop\php\x_1.json', 'r') as f:
            x_1 = json.load(f)
            print(x_1)
        with open(r'C:\Users\Desktop\php\x_2.json', 'r') as f:
            x_2 = json.load(f)
        self.label.setText('进行贝叶斯预测……')
        # 贝叶斯预测
        x1, filelist = load_word(path)
        CV = CountVectorizer(decode_error="ignore", max_features=10000, token_pattern=r'\b\w+\b', min_df=1, max_df=1.0,
                             vocabulary=x_1)
        x1 = CV.fit_transform(x1).toarray()
        transformer = TfidfTransformer(smooth_idf=False)
        x_tfidf = transformer.fit_transform(x1)
        x1 = x_tfidf.toarray()
        # 导入训练好的模型
        gnb = joblib.load('x.model')
        y1_pred = gnb.predict(x1)

        # MLP预测
        self.label.setText('进行MLP训练……')
        self.label.setText('得到opcode……')
        if os.path.exists(r'C:\Users\Desktop\opcode') == False:
            os.mkdir(r'C:\Users\Desktop\opcode')
        get_opcode(path, r'C:\Users\Desktop\opcode')
        x2, filelist2 = load_opcode(r'C:\Users\Desktop\opcode')
        CV = CountVectorizer(ngram_range=(2, 2), decode_error="ignore", max_features=10000, token_pattern=r'\b\w+\b',
                             min_df=1, max_df=1.0, vocabulary=x_2)
        x2 = CV.fit_transform(x2).toarray()
        # 导入训练好的模型
        clf = joblib.load('xx.model')
        y2_pred = clf.predict(x2)
        # 最终预测结果
        self.label.setText('最终预测结果')
        y = y1_pred & y2_pred
        y = list(y)
        self.textBrowser.append('可能存在威胁的文件')
        for i in range(len(y)):
            if y[i] == 1:
                self.textBrowser.append(filelist[i])

    def scan_btn(self):
        t = threading.Thread(target=self.scan)
        t.start()


if __name__ == "__main__":
    app = QtWidgets.QApplication(sys.argv)
    Widget = myWin()
    Widget.show()
    sys.exit(app.exec_())

上一篇:zookeeper事务处理-Follower


下一篇:Python3 源码阅读 深入了解Python GIL