这是 酒仙桥六号部队 的第 63 篇文章。
全文共计7453个字,预计阅读时长20分钟。
-
使用awk来将日志里面的所有的IP筛选出来保存到一个文本文档中。
awk '{print $1}' access.log >ip.txt
-
将ip.txt 文件中的IP进行排序,去重和计数。这个 192.168.2.7 IP访问次数过多,肯定是有问题的,后续对这个ip进行重点排查。
sort ip.txt |uniq -c
-
根据上面发现ip 192.168.2.7 短时间对目标网站发起了大量的请求。
-
在这里可以看到报出了大量的404,请求方式为HEAD,根据这些可以判断。192.168.2.7这个IP在2020年7月18日14:20:23对网站进行了扫描,以此来判断网站存在的一些敏感文件。
-
从下面日志可以看出攻击ip访问登录接口,进行爆破,并在 2020年7月18日16:29:35 爆破成功,进行登录,日志状态返回200。
-
访问了phpinfo敏感文件。
-
一般的攻击者登录成功后,在后台一般都是找上传点或者命令执行的地方获取shell。不想获取shell的黑客(QVQ你懂的)。匹配路由关于upload 关键词日志发现攻击者已成功上传shell.php文件。
-
对shell.php 文件进行查看,发现是冰蝎木马。后续需要对主机入侵痕迹进行排查。(下文以编写脚本的方式进行简单的逐项检测)
https://github.com/Bypass007/Emergency-Response-Notes
https://github.com/grayddq/GScan.git
#coding:utf-8
import os
import json
class Linux_Check:
def __init__(self):
ipadd="ifconfig -a | grep Bcast | awk -F "[ :]+" '{print $4}'"
self.passmax="cat /etc/login.defs | grep PASS_MAX_DAYS | grep -v ^# | awk '{print $2}'"
self.passmin="cat /etc/login.defs | grep PASS_MIN_DAYS | grep -v ^# | awk '{print $2}'"
self.passlen="cat /etc/login.defs | grep PASS_MIN_LEN | grep -v ^# | awk '{print $2}'"
self.passage="cat /etc/login.defs | grep PASS_WARN_AGE | grep -v ^# | awk '{print $2}'"
self.uid="awk -F[:] 'NR!=1{print $3}' /etc/passwd"
self.sshd_config="cat /etc/ssh/sshd_config | grep -v ^# |grep 'PermitRootLogin no'"
self.bash_histrory="cat /etc/profile|grep HISTSIZE|head -1|awk -F[=] '{print $2}'"
self.Result=[]
self.ssh_authorized_user={}
## 口令生存周期检查
def check_passmax(self):
result= {"name":"口令生存周期检查", "level":"middle","service":[""],"user":["root"],"filename":["/etc/login.defs"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen(self.passmax).read()
if 0< int(shell_process)<=90:
result["msg"]="口令生成周期为%s" %shell_process
else:
result["check"]=False
result["msg"]="口令生成周期为%s" %shell_process
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 口令更改最少时间间隔
def check_passmin(self):
result= {"name":"口令更改最少时间间隔", "level":"middle","service":[""],"user":["root"],"filename":["/etc/login.defs"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen(self.passmin).read()
if int(shell_process)>=6:
result["msg"]="口令更改最小时间间隔为%s天,符合要求" %shell_process
else:
result["check"]=False
result["msg"]="口令更改最小时间间隔为%s天,不符合要求,建议设置大于等于6天" %shell_process
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 口令最小长度
def check_passlen(self):
result= {"name":"口令最小长度", "level":"middle","service":[""],"user":["root"],"filename":["/etc/login.defs"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen(self.passlen).read()
if int(shell_process)>=8:
result["msg"]="口令最小长度为%s,符合要求" %shell_process
else:
result["check"]=False
result["msg"]="令最小长度为%s,不符合要求,建议设置最小长度大于等于8" %shell_process
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 检查空弱口令
def check_empty(self):
result= {"name":"检查空弱口令", "level":"critical","service":[""],"user":["root"],"filename":["/etc/shadow"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen("awk -F: 'length($2)==0 {print $1}' /etc/shadow 2>/dev/null").read().splitlines()
if not shell_process:
result["msg"]="不存在空弱口令账户"
else:
result["check"]=False
result["msg"]="存在空弱口令账户%s"%str(shell_process)
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 检查sudo权限异常用户
def check_sudo(self):
result= {"name":"检查sudo权限异常用户", "level":"critical","service":[""],"user":["root"],"filename":["/etc/sudoers"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen("cat /etc/sudoers 2>/dev/null |grep -v '#'|grep 'ALL=(ALL)'|awk '{print $1}'").read().splitlines()
userinfo=[]
for user in shell_process:
if user.replace("n", "") != 'root':
userinfo.append(user)
if not userinfo:
result["msg"]="不存在sduo特权异常用户"
else:
result["check"]=False
result["msg"]="存在sudo权限异常用户%s"%str(userinfo)
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 检查特权用户组
def check_gid(self):
result= {"name":"检查特权用户组", "level":"critical","service":[""],"user":["root"],"filename":["/etc/passwd"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen("cat /etc/passwd | grep '/bin/bash' | awk -F: '$4==0 {print $1}' 2>/dev/null").read().splitlines()
userinfo=[]
for user in shell_process:
if user.replace("n", "") != 'root':
userinfo.append(user)
if not userinfo:
result["msg"]="不存在特权组用户"
else:
result["check"]=False
result["msg"]="存在特权组用户%s"%str(userinfo)
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 口令过期警告时间天数
def check_passage(self):
result= {"name":"口令过期警告时间天数", "level":"info","service":[""],"user":["root"],"filename":["/etc/login.defs"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen(self.passage).read()
if int(shell_process)>=30:
result["msg"]="口令过期警告时间天数为%s,符合要求" %shell_process
else:
result["check"]=False
result["msg"]="口令过期警告时间天数为%s,不符合要求,建议设置大于等于30并小于口令生存周期" %shell_process
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 找非root账号UID为0的账号
def check_uid(self):
result= {"name":"查找非root账号UID为0的账号", "level":"critical","service":["ssh","sshd"],"user":["root"],"filename":["/etc/passwd"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen(self.uid).read().splitlines()
if "0" not in shell_process:
result["msg"]="不存在非root账号的账号UID为0,符合要求"
else:
result["check"]=False
result["msg"]="存在非root账号的账号UID为0,不符合要求"
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 检查是否允许root账号登录
def check_sshdconfig(self):
result= {"name":"检查是否允许root账号登录", "level":"high","service":["ssh","sshd"],"user":["root"],"filename":["/etc/ssh/sshd_config"],"port":["22"],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen(self.sshd_config).read().splitlines()
if shell_process:
result["msg"]="root不能程登录符合要求"
else:
result["check"]=False
result["msg"]="root用户可以远程登录不符合要求"
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 检查是否开启日志审计auditd
def check_auditd(self):
result= {"name":"检查是否开启日志审计auditd", "level":"high","service":["auditd"],"user":["root"],"filename":["/etc/ssh/sshd_config"],"port":["22"],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen("service auditd status").read().splitlines()
for info in shell_process:
if "Active: active (running)" in info:
result["msg"]="开启了日志审计auditd"
result["check"]=True
break
else:
result["check"]=False
result["msg"]="没有开启日志审计auditd"
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 历史命令保存的最大条数检测
def check_bash_history(self):
result= {"name":"历史命令保存的最大条数检测", "level":"high","service":[""],"user":["root"],"filename":["/etc/profile"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process = os.popen(self.bash_histrory).read().splitlines()[0]
if int (shell_process)<=500:
result["msg"]="历史保存的最大命令条数符合要求"
else:
result["check"]=False
result["msg"]="历史保存的最大命令条数超过500条不符合要求"
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 检查是否开启telnet
def check_open_Telnet(self):
result= {"name":"检查是否开启telnet", "level":"high","service":["telnet"],"user":["root"],"filename":["/etc/xinetd.d/telnet"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process=os.popen("cat /etc/xinetd.d/telnet | grep disable | awk '{print $3}'")[0]
if shell_process!="yes":
result["msg"]="没有开启Telnet服务"
else:
result["check"]=False
result["msg"]="开启了telnet服务"
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 查是否开启nfs服务
def check_open_nfs(self):
result= {"name":"检查是否开启nfs服务", "level":"high","service":["NFS"],"user":["root"],"filename":[""],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
shell_process=os.popen("chkconfig --list nfs |grep on").read().splitlines()
if not shell_process:
result["msg"]="没有开启nfs服务"
else:
result["check"]=False
result["msg"]="开启了nfs服务"
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 检查重要系统文件权限
def check_file_analysis(self):
result= {"name":"检查重要系统文件权限", "level":"high","service":[""],"user":["root"],"filename":['/etc/passwd', '/etc/shadow','/etc/group','/etc/securetty','/etc/services','/etc/xinetd.conf','/etc/grub.conf','/etc/lilo.conf'],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
files = ['/etc/passwd', '/etc/shadow','/etc/group','/etc/securetty','/etc/services','/etc/xinetd.conf','/etc/grub.conf','/etc/lilo.conf']
file_info=[]
for file in files:
if not os.path.exists(file): continue
shell_process = os.popen("ls -l " + file + " 2>/dev/null |awk '{print $1}'").read().splitlines()
if len(shell_process) != 1: continue
if file == '/etc/passwd' and shell_process[0] != '-rw-r--r--':
info= "/etc/passwd 文件权限变更",shell_process[0]
file_info.append(info)
elif file == '/etc/shadow' and shell_process[0] != '----------':
info="/etc/shadow 文件权限变更",shell_process[0]
file_info.append(info)
elif file == '/etc/group' and shell_process[0] != '-rw-r--r--':
info= "/etc/group 文件权限变更%s",shell_process[0]
file_info.append(info)
elif file == '/etc/securetty' and shell_process[0] != '-rw-------':
info= "/etc/securetty 文件权限变更",shell_process[0]
file_info.append(info)
elif file == '/etc/services' and shell_process[0] != '-rw-------':
info= "/etc/services 文件权限变更",shell_process[0]
file_info.append(info)
elif file == '/etc/xinetd.conf' and shell_process[0] != '-rw-------':
info= "/etc/xinetd.conf 文件权限变更",shell_process[0]
file_info.append(info)
elif file == '/etc/grub.conf' and shell_process[0] != '-rw-------':
info= "/etc/grub.conf 文件权限变更",shell_process[0]
file_info.append(info)
elif file == '/etc/lilo.conf' and shell_process[0] != '-rw-------':
info="/etc/lilo.conf 文件权限变更",shell_process[0]
file_info.append(info)
if not file_info:
result["msg"]="重要系统文件权限没有变更。"
else:
result["check"]=False
result["msg"]="文件权限发生变更%s"%str(file_info)
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
## 检查免密码登录
def check_authorized_keys(self):
result= {"name":"检查ssh免密码登录", "level":"critical","service":["sshd","ssh"],"user":["root"],"filename":[".ssh/authorized_keys"],"port":[""],"src_port":[""],"dest_port":[""],"pid":[""],"protocol":[""],"check":True}
try:
for dir in os.listdir('/home/'):
self.file_analysis( os.path.join('%s%s%s' % ('/home/', dir, '/.ssh/authorized_keys')),dir)
self.file_analysis('/root/.ssh/authorized_keys', 'root')
if not self.ssh_authorized_user:
result["msg"]="不存在免密码登录"
else:
result["check"]=False
result["msg"]="存在免密码登录%s"%str(self.ssh_authorized_user)
except Exception as e:
result["error"]=str(e)
finally:
self.Result.append(result)
# 分析authorized_keys文件
def file_analysis(self, file, user):
try:
if os.path.exists(file):
shell_process = os.popen("cat " + file + " 2>/dev/null |awk '{print $3}'").read().splitlines()
# print (shell_process)
if shell_process:
self.ssh_authorized_user=shell_process
#print (self.ssh_authorized_user)
return
except:
return
def run(self):
self.check_passmax()
self.check_passmin()
self.check_passlen()
self.check_passage()
self.check_uid()
self.check_sshdconfig()
self.check_auditd()
self.check_bash_history()
self.check_open_Telnet()
self.check_empty()
self.check_gid()
self.check_sudo()
self.check_open_nfs()
self.check_file_analysis()
self.check_authorized_keys()
if __name__ == '__main__':
obj=Linux_Check()
obj.run()
print (json.dumps(obj.Result,encoding='UTF-8', ensure_ascii=False))
https://www.elastic.co/cn/beats/
-
轻量级(指的是agent)配置简单,i/o 资源占用小。 -
完整的一套分析体系,灵活自定义各组件。 -
可以适用任何网络架构平台目前输出支持ES,logstash,kafka,redis,file,console,…
-
要想真的高效的用起来首先分析平台搭建比较麻烦,需要依赖很多组件去实现一套完整的流程,下图是国内美团的架构,比较复杂。
https://osquery.io/
-
查看下面的所有表(.tables)
-
查询系统用户 (select * from user)
-
查询进程打开的文件 (select * from process_open_files)
免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论