由于工作变动,入职了一家小甲方,由于安全只有一个人,可谓是苦不堪言,猛男落泪.JPG。
在堆完一些基础的设备后,由于告警分散,无法关联分析,设备之间无法联动等等,一个安全运营平台的需求就特别明显。由于经费和发展规模的因素等,一开始试用了免费版本的siem平台(ELK、opensearch等),效果不是很理想,于是我决定采用开源组件拼凑一个安全运营平台。
1、纳管所有安全告警
2、接入所有安全所需日志,提供关联分析以及溯源
3、自动化处置
4、漏洞管理
5、安全态势可视化(提供给值班人员观看)
PS:本文只放出告警对接的部分,其他部分感兴趣可以找我一起交流
数据采集工具使用winlogbeat、filebeat、以及设备自身的syslog,所有的数据汇总到kafka队列中
kafka的消费者有两个角色:
1)Flink:做实时分析
2)ClickHouse:做全量存储,提供日志检索以及溯源分析
Flink分析的结果存储到MySQL中,最终使用python搭建一个Web平台,用于各种功能管理、展示、API调用等
详细的生产集群部署参考官方文档就好,这里放出单机部署的方法以及对接开源蜜罐HFish告警的过程,提供给感兴趣想测试的师傅
3.1 Logstash
前往https://www.elastic.co/cn/downloads/logstash 下载最新版本
配置文件:
input{
syslog{
port => 514
}
}
filter {
if 'HFish Threat' in [message]{
grok {
match =>{
"message" => "^title: %{DATA:title} |.* attack_type: %{DATA:attack_type} |.* name: %{DATA:hp_name} |.* src_ip: %{IPV4:src_ip} |.* src_port: %{DATA:src_port} |.* dst_ip: %{IPV4:dst_ip} |.* dst_port: %{DATA:dst_port} |.* time: %{TIMESTAMP_ISO8601:time} "
}
remove_field => ["message"]
remove_field => ["priority"]
remove_field => ["severity_label"]
remove_field => ["facility_label"]
remove_field => ["pid"]
remove_field => ["facility"]
remove_field => ["host"]
remove_field => ["@timestamp"]
remove_field => ["timestamp"]
remove_field => ["timestamp8601"]
remove_field => ["@version"]
remove_field => ["severity"]
}
}
}
output{
stdout { codec => rubydebug }
if [title] == "HFish Threat Alert" {
kafka {
bootstrap_servers => "ip:host"
codec => json
topic_id => "Hfish"
}
}
}
运行启动
nohup ./bin/logstash -f config/logstash.conf > /tmp/nohup_logstash.log 2&1 &
3.2 Kafka
部署zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
部署kafka
docker run --name kafka01 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=IP:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d wurstmeister/kafka
3.3 Clickhouse
docker部署
docker run -d --name ch-server --ulimit nofile=262144:262144 -p 8123:8123 -p 9000:9000 -p 9009:9009 yandex/clickhouse-server
设置密码登录
docker exec -it ch-server bash
cd /etc/clickhouse-server
PASSWORD=$(base64 < /dev/urandom | head -c8); echo "clickhouse~"; echo -n "clickhouse~" | sha256sum | tr -d '-'
vim user.xml修改用户密码,并将<password></password>修改为 <password_sha256_hex> 密码密文 </password_sha256_hex>
创建kafka同步表,用于存放全量数据
SQL 1:
CREATE TABLE kafka_hfish (
src_port String,
src_ip String,
dst_ip String,
program String,
logsource String,
title String,
dst_port String,
time DateTime
) Engine = MergeTree
PARTITION BY toYYYYMMDD(time)
ORDER BY (time);
SQL 2:
CREATE TABLE kafka_hfish_queue (
src_port String,
src_ip String,
dst_ip String,
program String,
logsource String,
title String,
dst_port String,
time DateTime
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'IP:HOST',
kafka_topic_list = 'Hfish',
kafka_group_name = 'clickhouse',
kafka_format = 'JSONEachRow';
SQL 3:
CREATE MATERIALIZED VIEW kafka_readings_view TO kafka_hfish AS
SELECT src_port, src_ip, dst_ip, program, logsource, title, dst_port, time
FROM kafka_hfish_queue;
4.4 Flink
建一个docker网络
docker network create flink
docker部署
docker run -t -d --name jmr --network flink -e JOB_MANAGER_RPC_ADDRESS=jmr -p 8081:8081 flink:1.14.4-scala_2.12 jobmanager
docker run -t -d --name tmr --network flink -e JOB_MANAGER_RPC_ADDRESS=jmr flink:1.14.4-scala_2.12 taskmanager
1、在HFish上设置syslog发送告警
2、Logstash配置文件中配置好日志切割的正则,上个小结已经给出配置,切割结果如下图
PS:此时我发现,HFish并没有将告警聚合就直接丢出来了,所以我们得在计算引擎上去做聚合,不然最终一次攻击会产生非常多告警工单
3、Flink写对应的规则任务,上传到Flink平台中执行
代码逻辑:消费kafka并将结果传入mysql。为了抑制上面提到的重复syslog消息,在Flink代码中开一个window时间窗口,再reduce去做一个聚合,最终传入mysql
下面放出关键代码:
public class HFish_Alert {
public static void main(String[] args) throws Exception{
//获取运行时环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//kafka配置文件
Properties props = new Properties();
props.put("bootstrap.servers", "IP:9092");
props.put("zookeeper.connect","IP:2181");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value 反序列化
SingleOutputStreamOperator<Entity> StreamRecord = env.addSource(new FlinkKafkaConsumer<>(
"Hfish",
new SimpleStringSchema(),//String 序列
props))
.map(string -> JSON.parseObject(string, Entity.class))
.setParallelism(1)
.keyBy((KeySelector<Entity, String>) Entity::getsrc_ip)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Entity>() {
@Override
public Entity reduce(Entity value1, Entity value2) throws Exception {
System.out.println("" + value1);
System.out.println("" + value2);
System.out.println("==============");
if (null != value2.hp_name) {
if (!value1.hp_name.contains(value2.hp_name)) {
value1.hp_name = value1.hp_name + "," + value2.hp_name;
}
}
if (!value1.dst_ip.contains(value2.dst_ip)) {
value1.dst_ip = value1.dst_ip + "," + value2.dst_ip;
}
if (!value1.dst_port.contains(value2.dst_port)) {
value1.dst_port = value1.dst_port + "," + value2.dst_port;
}
if (!value1.attack_type.contains("attack")) {
if (!value1.attack_type.contains(value2.attack_type)) {
value1.attack_type = value1.attack_type + "," + value2.attack_type;
}
}
System.out.println("" + value1);
System.out.println("---------------------nnnn");
return value1;
}
});
StreamRecord.addSink(new MysqlSink());
env.execute("HFish_Alert");
}
}
4、结果展示
不会写前端,轻点喷
告警List视图
告警处置视图,其中威胁情报一栏,在有互联网攻击告警时,会通过自动化脚本查询IP的威胁情报,然后插入数据库中
1、web平台登录功能开启OPT二次认证
2、kafka设置应用ACL、禁用不使用的四字命令
3、网络权限最小化开放
1、本团队任何实战都是已授权的、任何技术及工具也仅用于学习分享,请勿用于任何非法、违法活动,如有违背请自行承担后果,感谢大家的支持!!!
2、本团队一贯秉承Free共享的精神,但是也有大家的小愿望,因此本团队所有分享工具,严禁不经过授权的公开分享,被关注就是对我们付出的精力做的最大的支持。
END
原文始发于微信公众号(NOVASEC):小甲方“零经费”如何建设自己的安全运营平台
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论