一直以来,蜜罐系统建立对溯源系统来说有鸡肋之嫌,基于公网的蜜罐系统最早的产品形式是: wooyun当时做的一套蜜罐系统,主要是模拟底层服务弱口令(ssh、rdp、telnet)、数据库服务弱口令(mysql、postgresql、redis)、web应用(nginx、apache、tomcat等)、应用漏洞(OpenSSL心脏出血、Java RMI 反序列化代码执行、Struts2远程代码执行等)但是,这种主要是针对具有扫描器功能漏洞渗透自动化程序其作用,获取攻击源IP地址,获取植入恶意样本,然后沙盒运行样本做C&C地址分析。但是为了获取蜜罐样本的全面性,我们需要投入大量的基础设施建设成本,同时针对,我们需要DDoS定向溯源的地址还不一定有帮助。
那有没有更好的方式方法,可以第一时间发现DDoS呢?我们能不能通过一种更简便更轻资产的办法获得目前公网存活的恶意样本?根据我们的DDoS溯源的以往的经验上看,很多黑客喜欢通过http共享恶意文件方式,HFS是他们首选,,那我们是不是可以把全网HFS扫描出来,然后把这些恶意程序放到一个养鸡场,当说发起DDoS攻击的时候,我们养鸡场中的Agent也接到攻击命令,我们就知道哪个C&C Server发起功能,攻击的目标是谁。在这里我们引入一个概念:养鸡场,就是把认为是恶意样本都放到一起,正常饲养起来。
1. 快速识别全国HFS服务器
① shodan
a. 免费版只能翻两页 20条记录。
② censys
a. 是免费也提供API操作,但是准确性存疑,免费的东西。。。
③ zoomeye
a. 要求实名注册,国内的限制,这个。。。
a. 专业版1个月3000提供API,在线查询10000条/次,基础版在线1000条。穷。。。
① masscan
a. banner识别有限
② zmap+grab
a. censys就使用这种方式,结果不准确
③ nmap
a. 服务指纹识别准确但是速度是短板
④ Zoomeye
a. 前端:vue
b. 后端:Django
c. 数据库:MongoDB+Elasticsearch
d. 扫描Worker/调度:Lucifer,
e. 组件指纹识别引擎Wmap/Xmap
⑤ 某朋友的扫描系统
a. 客户端:masscan+nmap+nginx+redis+haproxy
b. 服务器端:Elasticsearch+ celery + django+ spark
① 目前系统处在验证阶段先简单架构,先扫描一下国内IP端。
② 端口扫描:masscan
③ 服务指纹识别:nmap
④ 数据库:postgresql
⑤ 恶意文件验证:下载HFS服务器文件通过API发送virustotal校验。
2. 养鸡场部署
① 更少的资源跑更多的蜜罐(杜鹃)
① IDS监控IP和DNS对外连接情况(Suricata+ES)
① 外网发包pps监控告警
在这之前写过一篇有关扫描器的文章(Freebuf bt0sea),当时的需求只是监控本公司IP段高危端口连接。没几个B段。但是现在监控的是全中国IP地址段,这有点多,直接nmap猴年马月也扫不完呀。
Problem 1:如果准确的拿到国内IP段
apnic|CN|ipv4||16384|20110412|allocated apnic|CN|ipv4||32768|20110316|allocated apnic|CN|ipv4||65536|20110316|allocated apnic|CN|ipv4||65536|20110224|allocated apnic|CN|ipv4||65536|20110224|allocated apnic|CN|ipv4||1048576|20110216|allocated apnic|CN|ipv4||524288|20110222|allocated apnic|CN|ipv4||131072|20110228|allocated apnic|CN|ipv4||65536|20110316|allocated apnic|CN|ipv4||16384|20110412|allocated
def getip(f,ipinfo=None): if ipinfo is None: ipinfo = [] for i in (f): if i.find("apnic") > -1 and i.find("ipv4") > -1 and i.find("summary") == -1: iplist = i.split("|") country = iplist[1] if country == 'CN': ipinfo.append(i) return(ipinfo)
Problem 2:端口扫描
启动masscan 30个进程
def Scan(): try: global g_queue while not g_queue.empty(): item = g_queue.get() result = "result"+item+".json" p = subprocess.Popen("/root/masscan/masscan "+item+" -p80 -oJ "+result, shell=True) p.wait() if g_queue.qsize() == 0: print (u'公网IP端口扫描结束') return "ok" except Exception,e: print e return e if __name__ == '__main__': reload(sys) sys.setdefaultencoding('utf-8') listThread = [] ### 获取ip列表加入全局变量中 conn = sqlite3.connect("ipwhois.db") cur = conn.cursor() print "Opened database successfully" cursor = cur.execute("SELECT inetnum from ipwhois") for row in cursor: tmpstring = re.split(' - ',row[0]) # print tmpstring[0], tmpstring[1], tmp=tmpstring[0]+'-'+tmpstring[1] g_queue.put(tmp) print tmp conn.close() for i in xrange(g_threadNum): thread = ScanThread(Scan) thread.start() listThread.append(thread) for thread in listThread: thread.join() print thread
Problem 3:服务指纹识别扫描
def store(result): fw = open('iplist.txt', 'a') with open(result,'r') as f: for line in f: if line.startswith('{ '}: try: temp = json.loads(line[:-2]) fw.writelines(temp["ip"]+'n') print temp["ip"] except: continue def list_dir_names(urPath): dirNames = os.listdir(urPath) for ones in dirNames: ones.decode('gbk')
return dirNames if __name__ == '__main__': reload(sys) sys.setdefaultencoding('utf-8') filelist=[] filelist=glob.glob("*.json") for item in filelist: if os.path.getsize(item)!=0: print item store(item) else: continue
def Scan(): try: global g_queue while not g_queue.empty(): item = g_queue.get() result = "result"+item.strip()+".xml" p = subprocess.Popen("/usr/bin/nmap -oX "+result+" -sV -p80 "+item, shell=True) p.wait() print "size = ", g_queue.qsize() if g_queue.qsize() == 0: print (u'公网IP扫描结束') return "ok" except Exception,e: print e return e def ImportiIPlist(): storejson_file=open('iplist.txt','rb') lines = storejson_file.readlines() for line in lines: print line g_queue.put(line) g_totalsize = g_queue.qsize() print g_totalsize if __name__ == '__main__': reload(sys) sys.setdefaultencoding('utf-8') listThread = [] ImportiIPlist() for i in xrange(g_threadNum): thread = ScanThread(Scan) thread.start() listThread.append(thread) for thread in listThread: thread.join() print thread
Problem 4:扫描结果入库
def Scan(): try: timeout = 300 tableName = "%s_%s" % ("scanresult", time.strftime("%Y%m%d")) conn1 = psycopg2.connect(database="postgres", user="postgres", password="[email protected]#", host="", port="5432") cur1 = conn1.cursor() g_totalsize = g_queue.qsize() while not g_queue.empty(): item = g_queue.get() print item ctime = strftime("%Y-%m-%d %H:%M:%S", gmtime()) print ctime nmap_report = NmapParser.parse_fromfile(item) for scanned_hosts in nmap_report.hosts: print scanned_hosts.address for serv in scanned_hosts.services: if serv.state == "open": m = serv.service_dict.get('extrainfo', '') print m if m.find(''')!=-1: pass else: sql = "insert into %s (task_id,ctime, address,port,service,product,product_version,product_extrainfo,os) VALUES ('%s','%s','%s','%s','%s','%s','%s','%s','%s')" sqlCmd = sql%(tableName,g_task_id,ctime,scanned_hosts.address,str(serv.port),serv.service,serv.service_dict.get('product', ''),serv.service_dict.get('version', ''),serv.service_dict.get('extrainfo', ''),'NULL') print sqlCmd cur1.execute(sqlCmd) conn1.commit() print "size = ", g_queue.qsize() g_size = g_queue.qsize() num = "%.2f" %(float(g_size)/float(g_totalsize)) if g_queue.qsize() == 0: print (u'公网IP扫描结束') return "ok" except Exception,e: print e pass def CreateTable(): curC = connC.cursor() sqlCreate = "create table if not exists %s ( task_id TEXT, ctime TEXT, address TEXT, port TEXT, service TEXT, product TEXT , product_version TEXT, product_extrainfo TEXT, os TEXT )" tableName = "%s_%s"%("scanresult", time.strftime("%Y%m%d")) sqlCmd = sqlCreate%tableName curC.execute(sqlCmd) curC.close() connC.commit() connC.close() def SetTask(): filelist = [] filelist = glob.glob("*.xml") for item in filelist: if os.path.getsize(item) != 0: print item g_queue.put(item) else: continue def main(): listThread = [] SetTask() CreateTable() for i in xrange(g_threadNum): thread = ScanThread(Scan) thread.start() listThread.append(thread) for thread in listThread: thread.join() print thread if __name__ == '__main__': reload(sys) sys.setdefaultencoding('utf-8') main()
SELECT DISTINCT(address),port,product,product_version,product_extrainfo FROM scanresult_20171231 WHERE product LIKE '%HttpFileServer%'