如何单机实时分析日均数亿安全日志?

admin 2024年11月27日11:05:01评论9 views字数 7473阅读24分54秒阅读模式

一、前言

在甲方做安全建设的时候,不可避免遇到大数量级的数据进行处理、分析的任务,比如分析整个公司的WAF日志、进程派生数据、K8s日志等等去发现风险,尤其在初期做反入侵的策略的时候,需要及时告警但又频繁修改策略,我们就需要一个能够编写策略而且可以顺利平滑发布新策略的系统。

这个系统是ELKEID HUB(由字节跳动开发并且开源),一个策略编写和分析平台,可以满足上面的需求,并且经过了几个月实践下来,对其比较满意。

二、搭建处理日均数亿的系统

ELKEID HUB需要搭配其他系统使用,需要自行配置Kafka、Elasticsearch、Kibana。

Kafka:它主要用于构建实时数据管道和流处理应用程序。在大数据生态系统中,数据的产生和处理往往是持续不断的,像网站的用户行为日志。

Elasticsearch:是一个分布式的、基于 RESTful API 的搜索和数据分析引擎。

Kibana:让用户能够更直观地理解和分析存储在 Elasticsearch 中的数据(后面用来查询分析数据)。

2.1、ELKEID HUB

Elkeid HUB 是一款由 Elkeid Team 维护的规则/事件处理引擎,支持流式/离线(社区版尚未支持)数据处理。初衷是通过标准化的抽象语法/规则来解决复杂的数据/事件处理与外部系统联动需求。

参考文档:https://github.com/bytedance/Elkeid/blob/main/elkeidup/deploy_hub-zh_CN.md

如何单机实时分析日均数亿安全日志?
0

这里也说一下这里的优缺点

先说下优点:性能高如json解码、正则都做了特殊优化(跟大佬交流得知)、平滑策略修改、可视化不错。

其实也有一些不足:比如ELKEID HUB很久没更新了(一些前端bug不修复),如果要使用分布式和编写插件的话需要购买商业版本(不能分布式的话,我觉得坑优点稍微大)。

推荐系统:ubtun20(ubuntu22会报不兼容)

# 配置可密钥登录
echo -e 'nPubkeyAuthentication yes' >> /etc/ssh/sshd_config
systemctl restart sshd


# 生成密钥
cd ~/.ssh/
ssh-keygen -t rsa -b 4096
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys 


# 测试是否密钥是否可用
ssh 127.0.0.1
# 下载
mkdir -p /root/.elkeidup && cd /root/.elkeidup


wget https://github.com/bytedance/Elkeid/releases/download/v1.9.1.4/elkeidup_package_v1.9.1.tar.gz.00
wget https://github.com/bytedance/Elkeid/releases/download/v1.9.1.4/elkeidup_package_v1.9.1.tar.gz.01
wget https://github.com/bytedance/Elkeid/releases/download/v1.9.1.4/elkeidup_package_v1.9.1.tar.gz.02
cat elkeidup_package_v1.9.1.tar.gz.*> elkeidup_package_v1.9.1.tar.gz


tar -xf elkeidup_package_v1.9.1.tar.gz
chmod a+x /root/.elkeidup/elkeidup


# ./elkeidup init --host {ip} --hub_only 生成配置文件config_example.yaml
./elkeidup init --host 10.234.170.51--hub_only
cp config_example.yaml config.yaml 


# 部署
./elkeidup deploy --hub_only

2.2、Kafka

通过容器部署Kafka,注意下面的方式无法远程访问(只能在容器内进行访问),如果有需求自行修改。

参考文档:https://github.com/bitnami/containers/tree/main/bitnami/kafka

version: "2"


services:
  zookeeper:
    image: docker.io/bitnami/zookeeper:3.9
    ports:
-"2181:2181"
    volumes:
-"zookeeper_data:/bitnami"
    environment:
- ALLOW_ANONYMOUS_LOGIN=yes
  kafka1:
    image: docker.io/bitnami/kafka:3.4
    ports:
-"9092:9092"
    volumes:
-"kafka1_data:/bitnami"
    environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
    depends_on:
- zookeeper
  kafka2:
    image: docker.io/bitnami/kafka:3.4
    ports:
-"9092:9092"
    volumes:
-"kafka2_data:/bitnami"
    environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
    depends_on:
- zookeeper


volumes:
  zookeeper_data:
    driver:local
  kafka1_data:
    driver:local
  kafka2_data:
    driver:local

2.3、Elasticsearch & Kibana

通过容器部署,参考官方文档,有详细的教程。

部署:Elasticsearch

官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html

sudo sysctl -w vm.max_map_count=262144


docker network create elastic
docker run --name es01 --net elastic -p 9200:9200-itd -m 4GB docker.elastic.co/elasticsearch/elasticsearch:8.15.0
# 找到token
docker logs es01 -f


# 加入其他,里面包含地址、凭据这些
token=eyJxxxxxxxxx
docker run -itd -e ENROLLMENT_TOKEN="$token"--name es02 --net elastic -it -m 4GB docker.elastic.co/elasticsearch/elasticsearch:8.15.0
docker run -itd -e ENROLLMENT_TOKEN="$token"--name es03 --net elastic -it -m 4GB docker.elastic.co/elasticsearch/elasticsearch:8.15.0


# 查看node节点
https://127.0.0.1:9200/_cat/nodes

部署:kibana

官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html#run-kibana-docker

docker stop kib01 && docker rm kib01
docker run -itd --name kib01 --net elastic -p 5601:5601 docker.elastic.co/kibana/kibana:8.15.0


# 生成token
docker exec -it es01 /usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana


# 登陆kibana设置即可

三、从日均亿级数据实时监控黑名单demo

3.1、全量拉阿里云云日志

云日志拉取有两种方式,一种是全量PullLogs(使用消费组和不使用消费组两种方式),另外一种是按次GetLogs。

我们通过PullLogs拉取全量日志(不使用消费组)

https://www.alibabacloud.com/help/zh/sls/user-guide/log-consumption-through-java-sdk?spm=a2c63.p38356.0.0.40aa610c0uDq2l

import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.aliyun.openservices.log.response.PullLogsResponse;


import java.util.HashMap;
import java.util.List;
import java.util.Map;


publicclassPullLogsDemo{
// 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写
privatestaticfinalStringendpoint="cn-hangzhou.log.aliyuncs.com";
// 本示例从环境变量中获取 AccessKey ID 和 AccessKey Secret。
privatestaticfinalStringaccessKeyId=System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
privatestaticfinalStringaccessKeySecret=System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Project 名称
privatestaticfinalStringproject="your_project";
// LogStore 名称
privatestaticfinalStringlogStore="your_logstore";


publicstaticvoidmain(String[] args)throwsException{
// 创建日志服务 Client
Clientclient=newClient(endpoint, accessKeyId, accessKeySecret);
// 查询 LogStore 的 Shard
ListShardResponseresp= client.ListShard(project, logStore);
System.out.printf("%s has %d shardsn", logStore, resp.GetShards().size());
Map<Integer,String> cursorMap =newHashMap<Integer,String>();
for(Shard shard : resp.GetShards()){
intshardId= shard.getShardId();
// 从头开始消费,获取游标。(如果是从尾部开始消费,使用 Consts.CursorMode.END)
            cursorMap.put(shardId, client.GetCursor(project, logStore, shardId,Consts.CursorMode.BEGIN).GetCursor());
}
try{
while(true){
// 从每个Shard中获取日志
for(Shard shard : resp.GetShards()){
intshardId= shard.getShardId();
PullLogsRequestrequest=newPullLogsRequest(project, logStore, shardId,1000, cursorMap.get(shardId));
PullLogsResponseresponse= client.pullLogs(request);
// 日志都在日志组(LogGroup)中,按照逻辑拆分即可。
List<LogGroupData> logGroups = response.getLogGroups();
System.out.printf("Get %d logGroup from logStore:%s:tShard:%dn", logGroups.size(), logStore, shardId);
// 完成处理拉取的日志后,移动游标。
                    cursorMap.put(shardId, response.getNextCursor());
}
}
}catch(LogException e){
System.out.println("error code :"+ e.GetErrorCode());
System.out.println("error message :"+ e.GetErrorMessage());
throw e;
}
}
}

再打入Kafka即可,速度量大的话并且建议多线程,不然拉取和推送到Kafka的速度会慢。

如何单机实时分析日均数亿安全日志?
1

注意:

1、全量PullLogs由于Shard机制,会导致消费的日志顺序并不是按照顺序排序的日志。比如如下图,如果是对前后顺序比较严格的,需要自己进行排序。

2、日志服务支持通过消费组消费数据。一个消费组由多个消费者构成,同一个消费组中的消费者共同消费一个Logstore中的数据,各个消费者不会重复消费数据。(通过api直接pull的话是可以重复拉取的)

如图:

如何单机实时分析日均数亿安全日志?
2

3.2、从零配置ELKEID HUB策略抓黑客

接下来配置ELKEID HUB一个蜜罐策略,比如使用某个云密钥AK就告警,一共分成6个步骤。

1、配置输入源:规则页 -> 输入源 -> 新建

如何单机实时分析日均数亿安全日志?
3

如果是空密码直接填写

{}

如果配置了密码,则根据下面配置进行修改

{"sasl.mechanism":"SCRAM-SHA-512","sasl.password":"pass","sasl.username":"user","security.protocol":"SASL_PLAINTEXT"}
如何单机实时分析日均数亿安全日志?
4

2、配置输出:规则页 -> 输出 -> 新建

如何单机实时分析日均数亿安全日志?
5
如何单机实时分析日均数亿安全日志?
6

3、配置规则:

新建规则集:规则页 -> 规则集 -> 新建

如何单机实时分析日均数亿安全日志?
7

新建规则:点击刚才新建的规则集 -> 根据图形化填写规则

如何单机实时分析日均数亿安全日志?
8

4、配置项目:规则页 -> 根据下图配置规则

如何单机实时分析日均数亿安全日志?
9

更多用法参考:

https://elkeid.bytedance.com/en/docs/hub/quick_start/quick_start_zh-CN.html

5、发布:规则发布 -> 规则发布

6、启动项目:规则发布 -> 项目操作 -> 启动

如何单机实时分析日均数亿安全日志?
10

到这里都搭建好了,可以在kibana中查看告警的信息了,当然也可以通过插件可以推送到群里告警。

如何单机实时分析日均数亿安全日志?
11

3.3、性能测试结论

机器负载:可以看到机器的cpu以及内存使用率都很低,长时间观察也没有太大的波动。(机器配置8核64G,内存一半还没用到)

如何单机实时分析日均数亿安全日志?
12

处理速度:sls产生日志 -> kafka -> elkeid -> elk告警,也就差不多10多秒的延迟,处理速度很快。

elkeid状态:elkeid给出的qps是2362,结合下上面的机器负载来说,还可以更高的QPS,也就是日均几亿的数据问题不大。

如何单机实时分析日均数亿安全日志?
13

四、总结

我们在一台机器8核64G的服务器搭建了整套策略分析平台,能够实时处理日均数亿的数据,虽然还有一些坑,但整体能够满足需求,可以做蛮多的策略。

当然后面可能需要写一些较为复杂的策略,可以使用Flink去做实时分析(比如时间水印机制蛮强大的),有时间后续再展开。

知识星球:目前聚焦红蓝对抗反入侵以及AI落地。(目前已经更新46篇原创文章而非所谓资源整合搬运的公开资源,并且保持高频输出),微信公众号的文章是来源于先矛

后盾的知识星球里面(只有公开了少部分文章)。

如何单机实时分析日均数亿安全日志?如何单机实时分析日均数亿安全日志?

社群:加我lufeirider微信进群。

往期历史文章

Redis getshell失败可能因为这三个小坑

TCP协议区分windows和linux实践 - 远程系统识别

域安全-PrintNightmare打域控漏洞的一次艰难利用

三条命令查杀冰蝎、哥斯拉内存马

某中间件反序列化链曲折调试

java内存马深度利用:窃取明文、钓鱼

k8s被黑真能溯源到攻击者吗?

“VT全绿”-手动patch exe免杀

最近CDN供应链事件的曲折分析与应对-业务安全

加载数据集或模型可能就中毒!大模型供应链安全

AI与基础安全结合的新的攻击面

AI落地-蓝军之默认密码获取

BootCDN供应链攻击分析与应对

挖洞技巧-扩展攻击面

weblogic-2019-2725exp回显构造

WEB越权-劝你多删参数值

原文始发于微信公众号(lufeisec):如何单机实时分析日均数亿安全日志?

免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉。
  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2024年11月27日11:05:01
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   如何单机实时分析日均数亿安全日志?https://cn-sec.com/archives/3433701.html
                  免责声明:文章中涉及的程序(方法)可能带有攻击性,仅供安全研究与教学之用,读者将其信息做其他用途,由读者承担全部法律及连带责任,本站不承担任何法律及连带责任;如有问题可邮件联系(建议使用企业邮箱或有效邮箱,避免邮件被拦截,联系方式见首页),望知悉.

发表评论

匿名网友 填写信息