小甲方“零经费”如何建设自己的安全运营平台

admin 2022年3月20日20:07:07评论132 views字数 5430阅读18分6秒阅读模式
0x00 前言


由于工作变动,入职了一家小甲方,由于安全只有一个人,可谓是苦不堪言,猛男落泪.JPG。

在堆完一些基础的设备后,由于告警分散,无法关联分析,设备之间无法联动等等,一个安全运营平台的需求就特别明显。由于经费和发展规模的因素等,一开始试用了免费版本的siem平台(ELK、opensearch等),效果不是很理想,于是我决定采用开源组件拼凑一个安全运营平台。




0x01 平台建设的目标

1、纳管所有安全告警

2、接入所有安全所需日志,提供关联分析以及溯源

3、自动化处置

4、漏洞管理

5、安全态势可视化(提供给值班人员观看)


PS:本文只放出告警对接的部分,其他部分感兴趣可以找我一起交流





0x02 架构

数据采集工具使用winlogbeat、filebeat、以及设备自身的syslog,所有的数据汇总到kafka队列中

kafka的消费者有两个角色:

    1)Flink:做实时分析

    2)ClickHouse:做全量存储,提供日志检索以及溯源分析

Flink分析的结果存储到MySQL中,最终使用python搭建一个Web平台,用于各种功能管理、展示、API调用等

小甲方“零经费”如何建设自己的安全运营平台







0x03 部署

详细的生产集群部署参考官方文档就好,这里放出单机部署的方法以及对接开源蜜罐HFish告警的过程,提供给感兴趣想测试的师傅


3.1 Logstash


前往https://www.elastic.co/cn/downloads/logstash 下载最新版本


配置文件:

input{    syslog{    port => 514    }}
filter {
if 'HFish Threat' in [message]{
grok { match =>{ "message" => "^title: %{DATA:title} |.* attack_type: %{DATA:attack_type} |.* name: %{DATA:hp_name} |.* src_ip: %{IPV4:src_ip} |.* src_port: %{DATA:src_port} |.* dst_ip: %{IPV4:dst_ip} |.* dst_port: %{DATA:dst_port} |.* time: %{TIMESTAMP_ISO8601:time} " } remove_field => ["message"] remove_field => ["priority"] remove_field => ["severity_label"] remove_field => ["facility_label"] remove_field => ["pid"] remove_field => ["facility"] remove_field => ["host"] remove_field => ["@timestamp"] remove_field => ["timestamp"] remove_field => ["timestamp8601"] remove_field => ["@version"] remove_field => ["severity"]
}}}
output{ stdout { codec => rubydebug } if [title] == "HFish Threat Alert" { kafka { bootstrap_servers => "ip:host" codec => json topic_id => "Hfish" } }}



运行启动nohup ./bin/logstash -f config/logstash.conf > /tmp/nohup_logstash.log 2&1 &




3.2 Kafka


部署zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper



部署kafka

docker run --name kafka01 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=IP:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d  wurstmeister/kafka



3.3 Clickhouse


docker部署

docker run -d --name ch-server --ulimit nofile=262144:262144 -p 8123:8123 -p 9000:9000 -p 9009:9009 yandex/clickhouse-server



设置密码登录

docker exec  -it ch-server bashcd /etc/clickhouse-serverPASSWORD=$(base64 < /dev/urandom | head -c8); echo "clickhouse~"; echo -n "clickhouse~" | sha256sum | tr -d '-'vim user.xml修改用户密码,并将<password></password>修改为 <password_sha256_hex> 密码密文 </password_sha256_hex>



创建kafka同步表,用于存放全量数据


SQL 1: 

CREATE TABLE kafka_hfish (    src_port String,    src_ip String,    dst_ip String,    program String,    logsource String,    title String,    dst_port String,    time DateTime) Engine = MergeTreePARTITION BY toYYYYMMDD(time)ORDER BY (time);




SQL 2:

CREATE TABLE kafka_hfish_queue (    src_port String,    src_ip String,    dst_ip String,    program String,    logsource String,    title String,    dst_port String,    time DateTime)ENGINE = KafkaSETTINGS kafka_broker_list = 'IP:HOST',       kafka_topic_list = 'Hfish',       kafka_group_name = 'clickhouse',       kafka_format = 'JSONEachRow';


     

   

SQL 3: 

CREATE MATERIALIZED VIEW kafka_readings_view TO kafka_hfish ASSELECT src_port, src_ip, dst_ip, program, logsource, title, dst_port, timeFROM kafka_hfish_queue;




4.4 Flink


建一个docker网络

docker network create flink


docker部署

docker run -t -d --name jmr --network flink -e JOB_MANAGER_RPC_ADDRESS=jmr -p 8081:8081  flink:1.14.4-scala_2.12 jobmanager



docker run -t -d --name tmr --network flink -e JOB_MANAGER_RPC_ADDRESS=jmr   flink:1.14.4-scala_2.12 taskmanager







0x04 对接HFish告警

1、在HFish上设置syslog发送告警

小甲方“零经费”如何建设自己的安全运营平台


2、Logstash配置文件中配置好日志切割的正则,上个小结已经给出配置,切割结果如下图

小甲方“零经费”如何建设自己的安全运营平台



PS:此时我发现,HFish并没有将告警聚合就直接丢出来了,所以我们得在计算引擎上去做聚合,不然最终一次攻击会产生非常多告警工单


3、Flink写对应的规则任务,上传到Flink平台中执行


代码逻辑:消费kafka并将结果传入mysql。为了抑制上面提到的重复syslog消息,在Flink代码中开一个window时间窗口,再reduce去做一个聚合,最终传入mysql


下面放出关键代码:

public class HFish_Alert {    public static void main(String[] args) throws Exception{        //获取运行时环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        //kafka配置文件        Properties props = new Properties();        props.put("bootstrap.servers", "IP:9092");        props.put("zookeeper.connect","IP:2181");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //key 反序列化        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value 反序列化

SingleOutputStreamOperator<Entity> StreamRecord = env.addSource(new FlinkKafkaConsumer<>( "Hfish", new SimpleStringSchema(),//String 序列 props))
.map(string -> JSON.parseObject(string, Entity.class))
.setParallelism(1)
.keyBy((KeySelector<Entity, String>) Entity::getsrc_ip)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new ReduceFunction<Entity>() { @Override public Entity reduce(Entity value1, Entity value2) throws Exception {
System.out.println("" + value1); System.out.println("" + value2); System.out.println("=============="); if (null != value2.hp_name) { if (!value1.hp_name.contains(value2.hp_name)) { value1.hp_name = value1.hp_name + "," + value2.hp_name; } } if (!value1.dst_ip.contains(value2.dst_ip)) { value1.dst_ip = value1.dst_ip + "," + value2.dst_ip; }
if (!value1.dst_port.contains(value2.dst_port)) { value1.dst_port = value1.dst_port + "," + value2.dst_port; }
if (!value1.attack_type.contains("attack")) { if (!value1.attack_type.contains(value2.attack_type)) { value1.attack_type = value1.attack_type + "," + value2.attack_type; } } System.out.println("" + value1); System.out.println("---------------------nnnn"); return value1; } }); StreamRecord.addSink(new MysqlSink());
env.execute("HFish_Alert");
}}



4、结果展示


不会写前端,轻点喷


告警List视图

小甲方“零经费”如何建设自己的安全运营平台


告警处置视图,其中威胁情报一栏,在有互联网攻击告警时,会通过自动化脚本查询IP的威胁情报,然后插入数据库中

小甲方“零经费”如何建设自己的安全运营平台



0x05 安全性如何保障

1、web平台登录功能开启OPT二次认证

2、kafka设置应用ACL、禁用不使用的四字命令

3、网络权限最小化开放

4、组件之间传输开启SSL
5、摆烂





0x06 总结
日子太苦了,等师傅一句:我带你


1、本团队任何实战都是已授权的、任何技术及工具也仅用于学习分享,请勿用于任何非法、违法活动,如有违背请自行承担后果,感谢大家的支持!!!


2、本团队一贯秉承Free共享的精神,但是也有大家的小愿望,因此本团队所有分享工具,严禁不经过授权的公开分享,被关注就是对我们付出的精力做的最大的支持。


END



如您有任何投稿、问题、建议、需求、合作、后台留言NOVASEC公众号!

小甲方“零经费”如何建设自己的安全运营平台

或添加NOVASEC-MOYU 以便于及时回复。

小甲方“零经费”如何建设自己的安全运营平台


感谢大哥们的对NOVASEC的支持点赞和关注

加入我们与萌新一起成长吧!


本团队任何技术及文件仅用于学习分享,请勿用于任何违法活动,感谢大家的支持!



原文始发于微信公众号(NOVASEC):小甲方“零经费”如何建设自己的安全运营平台

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2022年3月20日20:07:07
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   小甲方“零经费”如何建设自己的安全运营平台http://cn-sec.com/archives/834637.html

发表评论

匿名网友 填写信息