一、前言
在甲方做安全建设的时候,不可避免遇到大数量级的数据进行处理、分析的任务,比如分析整个公司的WAF日志、进程派生数据、K8s日志等等去发现风险,尤其在初期做反入侵的策略的时候,需要及时告警但又频繁修改策略,我们就需要一个能够编写策略而且可以顺利平滑发布新策略的系统。
这个系统是ELKEID HUB(由字节跳动开发并且开源),一个策略编写和分析平台,可以满足上面的需求,并且经过了几个月实践下来,对其比较满意。
二、搭建处理日均数亿的系统
ELKEID HUB需要搭配其他系统使用,需要自行配置Kafka、Elasticsearch、Kibana。
Kafka:它主要用于构建实时数据管道和流处理应用程序。在大数据生态系统中,数据的产生和处理往往是持续不断的,像网站的用户行为日志。
Elasticsearch:是一个分布式的、基于 RESTful API 的搜索和数据分析引擎。
Kibana:让用户能够更直观地理解和分析存储在 Elasticsearch 中的数据(后面用来查询分析数据)。
2.1、ELKEID HUB
Elkeid HUB 是一款由 Elkeid Team 维护的规则/事件处理引擎,支持流式/离线(社区版尚未支持)数据处理。初衷是通过标准化的抽象语法/规则来解决复杂的数据/事件处理与外部系统联动需求。
参考文档:https://github.com/bytedance/Elkeid/blob/main/elkeidup/deploy_hub-zh_CN.md
这里也说一下这里的优缺点
先说下优点:性能高如json解码、正则都做了特殊优化(跟大佬交流得知)、平滑策略修改、可视化不错。
其实也有一些不足:比如ELKEID HUB很久没更新了(一些前端bug不修复),如果要使用分布式和编写插件的话需要购买商业版本(不能分布式的话,我觉得坑优点稍微大)。
推荐系统:ubtun20(ubuntu22会报不兼容)
# 配置可密钥登录
echo -e 'nPubkeyAuthentication yes' >> /etc/ssh/sshd_config
systemctl restart sshd
# 生成密钥
cd ~/.ssh/
ssh-keygen -t rsa -b 4096
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
# 测试是否密钥是否可用
ssh 127.0.0.1
# 下载
mkdir -p /root/.elkeidup && cd /root/.elkeidup
wget https://github.com/bytedance/Elkeid/releases/download/v1.9.1.4/elkeidup_package_v1.9.1.tar.gz.00
wget https://github.com/bytedance/Elkeid/releases/download/v1.9.1.4/elkeidup_package_v1.9.1.tar.gz.01
wget https://github.com/bytedance/Elkeid/releases/download/v1.9.1.4/elkeidup_package_v1.9.1.tar.gz.02
cat elkeidup_package_v1.9.1.tar.gz.*> elkeidup_package_v1.9.1.tar.gz
tar -xf elkeidup_package_v1.9.1.tar.gz
chmod a+x /root/.elkeidup/elkeidup
# ./elkeidup init --host {ip} --hub_only 生成配置文件config_example.yaml
./elkeidup init --host 10.234.170.51--hub_only
cp config_example.yaml config.yaml
# 部署
./elkeidup deploy --hub_only
2.2、Kafka
通过容器部署Kafka,注意下面的方式无法远程访问(只能在容器内进行访问),如果有需求自行修改。
参考文档:https://github.com/bitnami/containers/tree/main/bitnami/kafka
version: "2"
services:
zookeeper:
image: docker.io/bitnami/zookeeper:3.9
ports:
-"2181:2181"
volumes:
-"zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka1:
image: docker.io/bitnami/kafka:3.4
ports:
-"9092:9092"
volumes:
-"kafka1_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
kafka2:
image: docker.io/bitnami/kafka:3.4
ports:
-"9092:9092"
volumes:
-"kafka2_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
volumes:
zookeeper_data:
driver:local
kafka1_data:
driver:local
kafka2_data:
driver:local
2.3、Elasticsearch & Kibana
通过容器部署,参考官方文档,有详细的教程。
部署:Elasticsearch
官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html
sudo sysctl -w vm.max_map_count=262144
docker network create elastic
docker run --name es01 --net elastic -p 9200:9200-itd -m 4GB docker.elastic.co/elasticsearch/elasticsearch:8.15.0
# 找到token
docker logs es01 -f
# 加入其他,里面包含地址、凭据这些
token=eyJxxxxxxxxx
docker run -itd -e ENROLLMENT_TOKEN="$token"--name es02 --net elastic -it -m 4GB docker.elastic.co/elasticsearch/elasticsearch:8.15.0
docker run -itd -e ENROLLMENT_TOKEN="$token"--name es03 --net elastic -it -m 4GB docker.elastic.co/elasticsearch/elasticsearch:8.15.0
# 查看node节点
https://127.0.0.1:9200/_cat/nodes
部署:kibana
官方文档:https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html#run-kibana-docker
docker stop kib01 && docker rm kib01
docker run -itd --name kib01 --net elastic -p 5601:5601 docker.elastic.co/kibana/kibana:8.15.0
# 生成token
docker exec -it es01 /usr/share/elasticsearch/bin/elasticsearch-create-enrollment-token -s kibana
# 登陆kibana设置即可
三、从日均亿级数据实时监控黑名单demo
3.1、全量拉阿里云云日志
云日志拉取有两种方式,一种是全量PullLogs(使用消费组和不使用消费组两种方式),另外一种是按次GetLogs。
我们通过PullLogs拉取全量日志(不使用消费组)
https://www.alibabacloud.com/help/zh/sls/user-guide/log-consumption-through-java-sdk?spm=a2c63.p38356.0.0.40aa610c0uDq2l
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts;
import com.aliyun.openservices.log.common.LogGroupData;
import com.aliyun.openservices.log.common.Shard;
import com.aliyun.openservices.log.exception.LogException;
import com.aliyun.openservices.log.request.PullLogsRequest;
import com.aliyun.openservices.log.response.ListShardResponse;
import com.aliyun.openservices.log.response.PullLogsResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
publicclassPullLogsDemo{
// 日志服务的服务接入点。此处以杭州为例,其它地域请根据实际情况填写
privatestaticfinalStringendpoint="cn-hangzhou.log.aliyuncs.com";
// 本示例从环境变量中获取 AccessKey ID 和 AccessKey Secret。
privatestaticfinalStringaccessKeyId=System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID");
privatestaticfinalStringaccessKeySecret=System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET");
// Project 名称
privatestaticfinalStringproject="your_project";
// LogStore 名称
privatestaticfinalStringlogStore="your_logstore";
publicstaticvoidmain(String[] args)throwsException{
// 创建日志服务 Client
Clientclient=newClient(endpoint, accessKeyId, accessKeySecret);
// 查询 LogStore 的 Shard
ListShardResponseresp= client.ListShard(project, logStore);
System.out.printf("%s has %d shardsn", logStore, resp.GetShards().size());
Map<Integer,String> cursorMap =newHashMap<Integer,String>();
for(Shard shard : resp.GetShards()){
intshardId= shard.getShardId();
// 从头开始消费,获取游标。(如果是从尾部开始消费,使用 Consts.CursorMode.END)
cursorMap.put(shardId, client.GetCursor(project, logStore, shardId,Consts.CursorMode.BEGIN).GetCursor());
}
try{
while(true){
// 从每个Shard中获取日志
for(Shard shard : resp.GetShards()){
intshardId= shard.getShardId();
PullLogsRequestrequest=newPullLogsRequest(project, logStore, shardId,1000, cursorMap.get(shardId));
PullLogsResponseresponse= client.pullLogs(request);
// 日志都在日志组(LogGroup)中,按照逻辑拆分即可。
List<LogGroupData> logGroups = response.getLogGroups();
System.out.printf("Get %d logGroup from logStore:%s:tShard:%dn", logGroups.size(), logStore, shardId);
// 完成处理拉取的日志后,移动游标。
cursorMap.put(shardId, response.getNextCursor());
}
}
}catch(LogException e){
System.out.println("error code :"+ e.GetErrorCode());
System.out.println("error message :"+ e.GetErrorMessage());
throw e;
}
}
}
再打入Kafka即可,速度量大的话并且建议多线程,不然拉取和推送到Kafka的速度会慢。
注意:
1、全量PullLogs由于Shard机制,会导致消费的日志顺序并不是按照顺序排序的日志。比如如下图,如果是对前后顺序比较严格的,需要自己进行排序。
2、日志服务支持通过消费组消费数据。一个消费组由多个消费者构成,同一个消费组中的消费者共同消费一个Logstore中的数据,各个消费者不会重复消费数据。(通过api直接pull的话是可以重复拉取的)
如图:
3.2、从零配置ELKEID HUB策略抓黑客
接下来配置ELKEID HUB一个蜜罐策略,比如使用某个云密钥AK就告警,一共分成6个步骤。
1、配置输入源:规则页 -> 输入源 -> 新建
如果是空密码直接填写
{}
如果配置了密码,则根据下面配置进行修改
{"sasl.mechanism":"SCRAM-SHA-512","sasl.password":"pass","sasl.username":"user","security.protocol":"SASL_PLAINTEXT"}
2、配置输出:规则页 -> 输出 -> 新建
3、配置规则:
新建规则集:规则页 -> 规则集 -> 新建
新建规则:点击刚才新建的规则集 -> 根据图形化填写规则
4、配置项目:规则页 -> 根据下图配置规则
更多用法参考:
https://elkeid.bytedance.com/en/docs/hub/quick_start/quick_start_zh-CN.html
5、发布:规则发布 -> 规则发布
6、启动项目:规则发布 -> 项目操作 -> 启动
到这里都搭建好了,可以在kibana中查看告警的信息了,当然也可以通过插件可以推送到群里告警。
3.3、性能测试结论
机器负载:可以看到机器的cpu以及内存使用率都很低,长时间观察也没有太大的波动。(机器配置8核64G,内存一半还没用到)
处理速度:sls产生日志 -> kafka -> elkeid -> elk告警,也就差不多10多秒的延迟,处理速度很快。
elkeid状态:elkeid给出的qps是2362,结合下上面的机器负载来说,还可以更高的QPS,也就是日均几亿的数据问题不大。
四、总结
我们在一台机器8核64G的服务器搭建了整套策略分析平台,能够实时处理日均数亿的数据,虽然还有一些坑,但整体能够满足需求,可以做蛮多的策略。
当然后面可能需要写一些较为复杂的策略,可以使用Flink去做实时分析(比如时间水印机制蛮强大的),有时间后续再展开。
知识星球:目前聚焦红蓝对抗和反入侵以及AI落地。(目前已经更新46篇原创文章而非所谓资源整合搬运的公开资源,并且保持高频输出),微信公众号的文章是来源于先矛
后盾的知识星球里面(只有公开了少部分文章)。
社群:加我lufeirider微信进群。
往期历史文章
TCP协议区分windows和linux实践 - 远程系统识别
域安全-PrintNightmare打域控漏洞的一次艰难利用
原文始发于微信公众号(lufeisec):如何单机实时分析日均数亿安全日志?
- 左青龙
- 微信扫一扫
- 右白虎
- 微信扫一扫
评论