导语:公有云厂商为什么要建立威胁情报系统?威胁情报的价值在哪里?我们不做会有什么损失?威胁情报交换能给我们带来什么附加价值?

0x00、公有云厂商安全威胁

Q:公有云厂商为什么要建立威胁情报系统?威胁情报的价值在哪里?我们不做会有什么损失?威胁情报交换能给我们带来什么附加价值?

各个问题都很尖锐~

业务层面

根据自己的公有云厂商业务特点(提供云主机、云存储、云数据库、云GPU),建立威胁情报体系,那么面临的威胁 主要集中在DDoS攻击和主机入侵。

1、有效的做到事中及时防御,降低用户遭受DDoS攻击总量,减少DDoS用户攻击成本。

2、DDoS攻击溯源,云安全增值服务。

3、内部针对性攻击事件溯源。

技术层面

1、历史追溯:通过聚类关联分析内部外部历史威胁情报数据,获得攻击事件

domain/IP/URL/samples/analysis report

2、及时阻断:能够对DDoS发起点(botnet主控端)、中间过程发起点(反射源/匿名代理TORP2P节点等)、到达点(直接攻击源),造成攻击无法持续。侧重点在前两部分。

3、DDoS溯源,找到攻击源关联的Domain注册信息、Email信息、云主机注册信息等任何通过身份信息注册的账号。

下面是一些攻击证据展示:

Threat Top 1、公有云平台业务系统和云上用户遭受DDoS攻击

Proof 1、没一天停过,在黑洞之前,攻击峰值总和维持在30G左右。

1.png

Proof 2、云上用户占比7成、云基础设施占3成。

11.png

Threat Top 2、云内主机中马

Proof 1、对外DDoS攻击告警

111.png

Proof 2、CDN网络遭受入侵

Proof3、DGA DNS Analytics解析监控,Xshell 感染终端监控。

当然还有很多威胁,我就不在这里赘述。

0x01、如何自建公有云厂商威胁情报系统

首先,需要建立一整套威胁情报生命周期管理:

在保护云上用户的隐私的前提下,有效的建立一套自动化情报收集、情报处理(特征提取工程、关联分析)、情报应用(抗D 、云WAF黑白名单、云基础设施定向攻击排查),以及情报共享(与拥有情报数据资源的厂商交换数据)。

其次,在带宽和服务器资源允许的情况下,使用DPI/DFI技术、蜜罐技术、沙箱技术建立自身的威胁情报收集系统,提供高质量的威胁情报(注意,是带证据的IOC情报)、通过AI技术提高检测速度和降低误报率。

自动化情报收集系统

@1、云数据中心NIDS部署,集中收集日志。

Suricata IDS + ELK

网络拓扑

1111.png

主要关注:shellcode监控、mysqlRDPSSH登陆尝试、木马活动、DoS等

@2、DDoS攻击数据源抓取

先说说技术方案选型问题,考虑到性能问题,主要是抓5元组的数据,那么DPI方案不成,xflow流量采样的方式(例如:绿盟NTA解决方案),1:1000抓包,在高速DDoS攻击的时候是无法抓到瞬间脉冲攻击包的。好吧,只有自己开发基于DPDK的数据统计和抓包系统。

2.png

@3、Web威胁数据收集

基础架构是复用DDoS Pcapdump服务器,根据云平台基础设施IP列表,抓取7层数据,存储到redis,然后汇总到数据仓库中。

@4、云平台基础设施服务器部署HIDS系统

有两种选择,一是使用传统的HIDS软件的方式收集,二是使用EDR机器学习的方法分析,我选择了后者。

22.png

222.png

详细描述一下在这一领域的实践之路:

1、训练数据选择

选择800万良性文件和400万恶意文件进行训练,Top 10 恶意家族是:

3.png

2、训练模型选择

(1)    通过https://github.com/erocarrera/pefile.git(VirusTotal和Cuckoo都在使用)访问PE文件(PE头信息、PE导入表、PE导出表、PE节数据、PE版本信息) 哈希所有部分。

Linux系统、MacOSX系统使用https://github.com/lief-project/LIEF 获取 elf文件和MachO文件头相关信息,并且哈希。

(2)    通过对准确率、漏报率、性能、模型大小、查询执行时间的评估来选择模型。(目前使用sklearn库,将来打算移植到ML Toolkit for TensorFlow,听说训练速度可以提升50倍,毕竟GPU越来越吊)

33.png

通过对文件判断的malware分类结果上报数据,提供基于主机的威胁情报源。

@5、公网蜜罐系统

333.png

有关公网蜜罐有以下思考:

(1)需要用高交互蜜罐,直接修改真实服务加入log记录系统,当然log系统尽量做的隐藏些。

蜜罐本身有一定威胁情报产生能力(降低误报(3次以上持续攻击才记录)、生成有价值情报(例如:web蜜罐产生sqlixss等情报数据,而不是只收集access log))

(2)有一套完善的基础服务管理系统,可以快速部署和销毁蜜罐系统,符合大规模部署要求。

情报处理

其实就是建立一套威胁溯源平台,平台提供查询接口,提供一个可视化的关系图图,例如:

3333.png

单更重要的情报逻辑处理部分:

这里以DDoS一次攻击事件分析说起:

(1)    首先通过自动化收集系统中收集到的攻击数据:

Data1:{攻击目的IP/攻击时间/攻击类型/攻击峰值/攻击持续时间}

Data2: {攻击目的IP /攻击目的端口/攻击时间/攻击源IP/攻击源端口/地理位置/攻击IP运营商/包大小}  取Top 1000数据

(2)    集合历史攻击数据 ,获取超过2次攻击源IP

(3)    通过端口扫描,回扫。确定IP存活状态和开放端口状态。

(4)    通过外联威胁情报查询IP信誉。确定IP状态排除TOR/匿名代理等

(5)    通过内外情报源查询攻击IP对应malware样本。

(6)    关联蜜罐系统收集样本,获取botnet Server地址。

(7)    对发生攻击的botnet Server发起反制措施。

情报共享

建立提高情报数据共享接口,同时需要对对外提供数据做脱敏处理。

@1、进入到ELK数据,建立威胁情报对外接口

(1)入库:

#!/usr/bin/env python
#coding=utf-8
import traceback
from elasticsearch import Elasticsearch
import sys
import json
import datetime,time
import psycopg2
connC = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",port="5432")
conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",port="5432")
def GetNum():
         es = Elasticsearch('x.x.x.x')
         data = es.search(index=index_day, body={"query": {"match_all": {}}},size=1)
         return data["hits"]["total"]
def Getidslog():
         es = Elasticsearch('x.x.x.x')
         tmp_str = datetime.datetime.now().strftime('%Y.%m.%d')
         index_day = 'logstash-' + tmp_str
         es.indices.put_settings(
                                                 {"index": {
                                                        "max_result_window": 500000
                                                 }})
         num = GetNum()
         data = es.search(index=index_day, body={"query": {"match_all": {}}}, size=num)
         datalen=len(data["hits"]["hits"])
         tableName = "%s_%s" % ("idslog", time.strftime("%Y%m%d"))
         cur1 = conn1.cursor()
         try:
              for c in xrange(0,datalen):
#                   print c
                     if data["hits"]["hits"][c]:
                            m_ctime=data["hits"]["hits"][c]["_source"]["@timestamp"]
                            m_src_ip=data["hits"]["hits"][c]["_source"]["src_ip"]
                            m_category=data["hits"]["hits"][c]["_source"]["alert"]["category"]
                            m_signature=data["hits"]["hits"][c]["_source"]["alert"]["signature"]
                            sql = "INSERT INTO %s (ctime,src_ip, category,signature) VALUES ('%s','%s','%s','%s')"
                            sqlCmd = sql % (tableName, m_ctime, m_src_ip, m_category, m_signature)
                            print sqlCmd
                            cur1.execute(sqlCmd)
                            conn1.commit()
         except Exception,e:
              traceback.print_exc()
def CreateTable():
    curC = connC.cursor()
    sqlCreate = "create table if not exists %s ( 
                 ctime TEXT,
                 src_ip TEXT,
                 category TEXT,
                 signature TEXT
                 )"
    tableName = "%s_%s"%("idslog", time.strftime("%Y%m%d"))
    sqlCmd = sqlCreate%tableName
    curC.execute(sqlCmd)
    curC.close()
    connC.commit()
if __name__ == '__main__':
         CreateTable()
         Getidslog()

(2)通过django web方式提供API接口。

  url(r'^api/outxxx/id$', outputAPI.as_view()),//获取某个IP对应的威胁情报,(只提供必要的情报,内部数据需要脱敏)
url(r'^api/outxxx/IPlist$', outputIPlistAPI.as_view()), //获得所有ip列表
class outputAPI(APIView):
    def get(self, request, format=None):
        m_src_ip = request.GET.get("ip") //安全机制已屏蔽
        print m_src_ip
        tableName ='idslog_20170814'
        conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",
                                port="5432")
        cur1 = conn1.cursor()
        SQL1="select * from %s WHERE src_ip=%s" %(tableName,m_src_ip)
        cur1.execute(SQL1)
        rows = cur1.fetchall()
        list =[]
        for row in rows:
            m = {"ctime": row[0], "src_ip": row[1],"category":row[2],"signature":row[3]}
            list.append(m)
        b=json.dumps(list)
        return HttpResponse(b)
class outputIPlistAPI(APIView):
    def get(self, request, format=None):
        tableName = 'idslog_20170814'
        conn1 = psycopg2.connect(database="postgres", user="xxx", password="xxx", host="127.0.0.1",
                                 port="5432")
        cur1 = conn1.cursor()
        SQL1 = "select DISTINCT(src_ip) from {} ".format(tableName)
        cur1.execute(SQL1)
        rows = cur1.fetchall()
        list = []
        for row in rows:
            m = {"src_ip": row[0]}
            list.append(m)
        b = json.dumps(list)
        return HttpResponse(b)

0x03、总结

公有云厂商自建威胁情报系统,是一个长期的投入,虽然短期看不出来多少效果,但是针对数据情报的积累到一定量的时候会有质的变化。

源链接

Hacking more

...