零、前言

在社区看到了这篇日志分析的文章--《Web日志安全分析浅谈》,文章整体写的非常棒,对日志分析的作用、难点、工程化建设和攻击溯源等方面进行了全面的描述。去年的毕设我也做了相关的研究,主要是去实现一个日志分析系统,算是一个更加的完整的工程化建设,这里把一些关键的过程与大家分享。

一、系统设计

在开发一个项目之前当然要先做好设计,明白自己想要的是一个什么系统,可以使用哪些技术、算法和硬件设备。我们分成功能设计、数据库设计、算法结构设计、硬件拓扑设计、前端界面设计、主框架设计6个部分。

1.1功能设计

系统应包括系统监控、用户管理(系统使用人员)、日志管理、实时分析、离线分析等功能,并为用户提供可视化的操作、分析与结果展示界面。功能结构图如图所示:

1.2数据设计

系统使用MySQL数据库,库中需要建立logmanagement数据库,拥有user、offline、online三个数据表,分别为用户表、离线数据表、在线数据表。数据库中的数据表如下:

offline数据表用于存储离线日志的分析结果,每一个上传的日志文件对应一条记录,包括名称、大小、类型、起止日期、访问量最高的前10个IP地址、访问量最高的前10个URL、10大攻击类型的攻击次数、以及攻击者和被攻击者的地理位置信息。数据表结构如下:

online数据表用于存储实时分析的中间结果,数据表的结构如下:

user表是管理员的用户表,用来存储管理员的个人信息。

1.3算法结构设计

系统使用了三种机器学习算法进行恶意攻击的识别:逻辑回归、支持向量机和朴素贝叶斯。同时包含了传统的正则匹配算法,正则虽然无法识别未知攻击,但是在已知攻击的识别上误报率相对机器学习是比较低的。为了能够识别爆破、目录扫描等与时序有关的攻击,还应设计数值统计模块进行恶意ip访问频率的计算。此外,多种算法如何结合需要进行足够的实验,谁的权重(对结果的影响)更大?并行还是串行?本系统中对正则匹配、数值统计和机器学习(三种机器学习算法两两取交集,即实行投票机制,三种中两者检测出异常则认为异常)进行串行处理,得出一条日志的识别结果--正常或恶意(具体到攻击类型),然后检测结果(而非日志)存储到数据库中,算法结构如图所示:

1.4硬件拓扑设计

为了实现系统对日志的高效收集,使用了Flume框架;为了具有大数据的处理能力,使用了Spark和HDFS做计算和存储。其中Flume与HDFS是完美兼容的,可以很方便的实现实时日志收集。这几个框架都是分布式的,结构大概如下所示

1.5前端界面设计

为了提供一个良好的用户交互性能,需要一个便捷的可视化界面,这里选用Flask框架开发一个Web管理平台,包含对服务器状态的监控、日志的管理以及分析结果的可视化等。

1.6主框架设计

主框架要能够说明系统的总体功能及数据流走向,其中,日志获取有两种途径,Web界面负责接收用户的离线上传,Flume负责实时获取;HDFS负责日志存储,自动将获取(离线和实时)的日志备份到各个节点上;Spark负责日志处理,运行特征匹配、数值统计和机器学习算法对其进行识别和分类;MySQL负责结果存储,根据日志获取途径,存储到不同的表中;Flask和Echarts负责界面展示与操作,使用多种图表样式,形象化地展示分析结果。如图所示:

离线分析就是用户通过Web界面将文本日志文件上传进行分析,相对简单,实时分析就需要严格控制数据流的走向。这里就像一个生产者与消费者的模型,Flume不断收集日志(生产)存储到HDFS,Spark Streaming不断的从HDFS读取日志(消费),实时结构如下:

二、系统实现

2.1日志预处理

我们知道一条日志大概是这样的

115.28.44.151 - - [28/Mar/2014:00:26:10 +0800] "GET /manager/html HTTP/1.1" 404 162 "-" "Mozilla/3.0 (compatible; Indy Library)"

字段含义为:远程IP - 用户名 时间 请求主体 响应码 请求字节 请求来源 客户端信息
想要对日志进行识别分析,首先要对各字段进行提取,其中攻击识别主要依靠“请求主体”,我们可以如下正则进行提取

log_Pattern = r'^(?P<remote_addr>.*?) - (?P<remote_user>.*) \[(?P<time_local>.*?)\] "(?P<request>.*?)" '\
        '(?P<status>.*?) (?P<body_bytes_sent>.*?) "(?P<http_referer>.*?)" "(?P<http_user_agent>.*?)"$'

2.2正则匹配

算法的匹配正则来自与网络和一些CMS厂商的的正则代码,经过多次修改测试可以识别常见的已知的Web攻击,包括SQL注入、XSS攻击、命令执行等常见Web漏洞。比如部分正则如下所示:

self.SQL_pattern = """/select(\s)+|insert(\s)+|update(\s)+|(\s)+and(\s)+|(\s)+or(\s)+|delete(\s)+|\'|\/\*|\*|\.\.\/
        |\.\/|union(\s)+|into(\s)+|load_file(\s)+|outfile(\s)+"""
self.Webshell_pattern = """(preg_replace.*\/e|`.*?\$.*?`|\bcreate_function\b|\bpassthru\b|\bshell_exec\b|\bexec\b|
        \bbase64_decode\b|\bedoced_46esab\b|\beval\b|\bsystem\b|\bproc_open\b|\bpopen\b|\bcurl_exec\b|\bcurl_multi_exec\b|
        \bparse_ini_file\b|\bshow_source\b|cmd\.exe|KAdot@ngs\.ru|小组专用大马|提权|木马|PHP\s?反弹|shell\s?加强版|
        WScript\.shell|PHP\s?Shell|Eval\sPHP\sCode|Udp1-fsockopen|xxddos|Send\sFlow|fsockopen\('(udp|tcp)|SYN\sFlood)|
        z0|z1|z2|z9|caidao"""
self.XSS_pattern = """xss|javascript|vbscript|expression|applet|meta|xml|blink|link|style|script|embed|object|
        iframe|frame|frameset|ilayer|layer|bgsound|title|base|onabort|onactivate|onafterprint|onafterupdate|
        onbeforeactivate|onbeforecopy|onbeforecut|onbeforedeactivate|onbeforeeditfocus|onbeforepaste|onbeforeprint|
        onbeforeunload|onbeforeupdate|onblur|onbounce|oncellchange|onchange|onclick|oncontextmenu|oncontrolselect|
        oncopy|oncut|ondataavailable|ondatasetchanged|ondatasetcomplete|ondblclick|ondeactivate|ondrag|ondragend|
        ondragenter|ondragleave|ondragover|ondragstart|ondrop|onerror|onerrorupdate|onfilterchange|onfinish|onfocus|
        onfocusin|onfocusout|onhelp|onkeydown|onkeypress|onkeyup|onlayoutcomplete|onload|onlosecapture|onmousedown|
        onmouseenter|onmouseleave|onmousemove|onmouseout|onmouseover|onmouseup|onmousewheel|onmove|onmoveend|onmovestart|
        onpaste|onpropertychange|onreadystatechange|onreset|onresize|onresizeend|onresizestart|onrowenter|onrowexit|
        onrowsdelete|onrowsinserted|onscroll|onselect|onselectionchange|onselectstart|onstart|onstop|onsubmit|
        onunload(\s)+"""

所有的攻击类型如下所示

2.3数值统计

在所采集海量日志文本中,包含了大量用户行为、交互IP、访问次数等信息,这些信息所表现出的统计特征可以明确地表达一个网络动作,而有些动作通过传统的规则匹配、黑白名单、策略控制等方式是很难发现的。比如在一段时间内访问目标网站的Agent连接数、不同域名下出现同一URL的次数、访问应答结果中非200的请求比例等,所有这些统计结果都表达了某种特定的网络行为,而这一行为如果符合网络攻击的行为,则通过数值统计的方法就能发现。比如下表中列举的常用的基于数值统计的方式发现潜在异常行为的一些统计方法。

在实现中只进行了一定时间内某ip访问频率的计算

def check(self,dataRDD,sc):
    """按分钟切割日志,以判断访问频率"""
    data_Memory = dataRDD.collect()
    start = data_Memory[0]
    temp_Time = time.strptime(start[2], "%d/%m/%Y:%H:%M:%S")
    start_Time = datetime.datetime(temp_Time[0],temp_Time[1],temp_Time[2],temp_Time[3],temp_Time[4],temp_Time[5])
    data_Min = [] #用来存储一分钟内切割的数据
    data_Result = []
    label = self.label
    for line in data_Memory:
        temp_Time = time.strptime(line[2], "%d/%m/%Y:%H:%M:%S")
        end_Time = datetime.datetime(temp_Time[0],temp_Time[1],temp_Time[2],temp_Time[3],temp_Time[4],temp_Time[5])
        if (end_Time-start_Time).seconds <= 10:
            data_Min.append(line)
        else:
            data_Result += label(data_Min)
            start_Time = end_Time
            data_Min = []
            data_Min.append(line)
    tempRDD = sc.parallelize(data_Result)
    return tempRDD

2.4特征向量

使用机器学习算法的前提是构造好的特征向量,日志的识别主要是针对日志记录中的request、referer和user-agent。request、referer都是URL路径,user-agent是浏览器名称,这三部分是用户可控且可能注入payload的地方。向量的构造方法主要参考用机器学习玩转恶意URL检测基于机器学习的web异常检测,训练集分为两个部分,一个是恶意的在URL请求,主要收集于github中知名的payload仓库,大约有30000条数据,其中包括SQL注入、Traversal(目录遍历)、XSS(跨站脚本攻击)、LFI(本地文件包含)、XML注入、SSI注入、XPATH注入、Webshell攻击。恶意请求部分样例如下:

二是正常的URL请求,测试部分包括日志中的request、referer和user-agent,其中request和referer的正常样本基本一致,都是URL请求地址,user-agent虽然并不是URL但在受到攻击时仍和request、referer这两处相似,都是注入相关漏洞的payload,所以这三处在分类的可以使用相同模型。其中正常的URL取自国外的日志网站SecRepo的正常Web日志,正常请求部分样例如下:

User-agent是指了各大浏览器厂商正常的名称,训练集中正常请求部分样例如下:

将上述的训练集一分为二,90%作为训练集集,10%作为测试集并进行打标用于测试。在真正分类的时候,将所有的日志依据request、referer和user-agent这三个部分进行二分类。向量构造首先通过N-Gram将文本数据向量化,比如对于下面的例子:

首先通过长度为N的滑动窗口将文本分割为N-Gram序列,例子中,N取2,窗口滑动步长为1,可以得到如下N-Gram序列:

其中N的取值需要进行多次试验,不同的算法最佳值不同。然后声明一个长度为10000的特征向量,将这些序列映射到特征向量中,并使用TF-IDF生成特征向量的值。词频—逆文档频率(简称TF-IDF)是一种用来从文本文档(例如URL)中生成特征向量的简单方法。它为文档中的每个词计算两个统计值:一个是词频(TF),也就是每个词在文档中出现的频率,另一个是逆文档频率(IDF),用来衡量一个词在整个文档语料库中出现的(逆)频繁程度。这两个值的积,也就是TF×IDF,展示了一个词与特定文档的相关程度(比如这个词在某文档中很常见,但在整个语料库中却很少见)。

def TFIDF(self,badData,goodData,distance,step):
    '''IT-IDF函数,根据不同的分词方法生成TF-IDF向量'''
    tf = self.tf
    badFeatures = badData.map(lambda line: tf.transform(split2(line,distance,step)))
    goodFeatures = goodData.map(lambda line: tf.transform(split2(line,distance,step)))
    badFeatures.cache()
    goodFeatures.cache()
    idf = IDF()
    idfModel = idf.fit(badFeatures)
    badVectors = idfModel.transform(badFeatures)
    idfModel = idf.fit(goodFeatures)
    goodVectors = idfModel.transform(goodFeatures)
    badExamples = badVectors.map(lambda features: LabeledPoint(1, features))
    goodExamples = goodVectors.map(lambda features: LabeledPoint(0, features))
    dataAll = badExamples.union(goodExamples)
    return dataAll

一个TF-IDF向量如下所示:

其中第一项0.0是向量的标签,表示这是一条恶意的请求,后面是各个分词序列在投影后的坐标及其TF×IDF值。

2.5机器学习算法

三种算法训练完毕后以后的检测只需从本地加载模型即可

def train(self,sc):
    # #生成Logistic和SVMWithSGD算法数据
    # dataLogistic = self.TFIDF(bad,good,3,1)
    # #生成SVMWithSGD算法数据
    # dataSVMWithSGD = self.TFIDF(bad,good,3,1)
    # #生成NaiveBayes算法数据
    # dataNaiveBayes = self.TFIDF(bad,good,2,1)
    # 使用分类算法进行训练,iterations位迭代次数,step为迭代步长
    # modelLogistic = LogisticRegressionWithSGD.train(data=dataLogistic,iterations=10000,step=6) 
    # print "train success1"
    # modelLogistic.save(sc,"model/modelLogistic")
    # modelSVMWithSGD = SVMWithSGD.train(data=dataSVMWithSGD,iterations=10000,step=5) 
    # print "train success2"
    # modelSVMWithSGD.save(sc,"model/modelSVMWithSGD")
    # modelNaiveBayes = NaiveBayes.train(data=dataNaiveBayes,lambda_=0.1) 
    # print "train success3"
    # modelNaiveBayes.save(sc,"model/modelNaiveBayes")
    self.modelLogistic = LogisticRegressionModel.load(sc,"modelLogistic")
    self.modelSVMWithSGD = SVMModel.load(sc,"modelSVMWithSGD")
    self.modelNaiveBayes = NaiveBayesModel.load(sc,"modelNaiveBayes")

def check_Line(self,line,algorithm):
    """元素检测"""
    tf = self.tf
    request_url = line
    check_Result = 0
    if "Logistic" in algorithm:
        check_Result += self.modelLogistic.predict(tf.transform(split2(request_url,3,1)))
    if "SVM" in algorithm:
        check_Result += self.modelSVMWithSGD.predict(tf.transform(split2(request_url,3,1)))
    if "NaiveBayes" in algorithm:
        check_Result += self.modelNaiveBayes.predict(tf.transform(split2(request_url,2,1)))
    print check_Result
    print "model check  :  "+str(check_Result)
    if check_Result>2:
        line.append([-1])
    else:
        line.append([])
    return line

def check(self,test,sc,algorithm="Logistic,SVM,NaiveBayes"):
    """执行模型检测"""
    self.train(sc)
    check_Line = self.check_Line
    temp1 = test.map(lambda line: check_Line(line,algorithm))
    return temp1.collect()

三、系统展示

离线日志分析
离线分析包括分析报表和日志管理两个子功能,用户需要在日志管理处上传日志才可通过分析报表查看分析结果(如果直接点击分析报表界面则默认显示最近一次的分析结果),日志在上传的过程中就会完成数据分析,分析结果会在分析报表界面显示并同时写入数据库。同理,用户也可以在日志管理处删除已上传的日志,但同时也会删除存在数据库中的分析结果。日志管理界面如图所示:

点击每一条记录右侧的查看按钮,即可跳到相应的分析报表界面,分析报表界面包含5个部分,分别为基本信息、访问次数最高的前10个IP、访问次数最高的前10个URL、攻击次数统计、攻击源地图。具体如图所示:





实时日志分析
实时分析部分包含两个显示界面,一个是访问次数(蓝色)与攻击次数(黑色)的双曲线图表,表示当前时间访问的次数以及当中可能包含的攻击次数,两者同时显示,相互对比;另一个是百度地图实时地理位置的世界地图图表。显示界面如下:

四、一些问题

1、程序运行起来虽然看起来还可以,但是识别率其实比较一般,一是正则写的不够完善;二是机器学习误报有点高,如果把判别条件放太宽,会出现一些低级分类错误。
2、算法中机器学习其实只是一个二分类,具体的攻击类别要靠正则识别,正则识别不出来而算法识别出来的则为未知攻击类型。
3、这里的实时其实是伪实时。

五、参考文献

http://www.freebuf.com/articles/web/126543.html
http://www.freebuf.com/articles/web/134334.html
http://www.freebuf.com/sectool/126698.html
http://blog.csdn.net/xnby/article/details/50782913

源链接

Hacking more

...