OCA(The Open Cybersecurity Alliance 开放网络安全联盟),根据其2021年4月在Linkedin上发布的公告称,其将承接IACD(Integrated Adaptive Cyber Defense,集成的自适应防护)成果转化的工作,并称其首要任务是增加和改善网络安全产品之间的互操作性。这是自2019年IACD项目公开信息停更以来,最让人兴奋的消息。由于OCA项目涉及的内容和参与的企业成员较多,且一直在变化,计划通过撰写系列文章对其整体情况和详细技术内容进行介绍。
在OCA基本情况概述中介绍到,Kestrel是一种威胁狩猎语言和框架,通过构建特定的跨数据源的统一语言,使得分析人员可以专注于要进行威胁狩猎的内容,进而更快地发现和分析威胁、促使协作和复用威胁TTP,并支持引入机器学习进行分析。本文将针对Kestrel进行详细介绍。
网络威胁狩猎是针对新的、定制化的APT攻击,计划和开发威胁发现程序。网络威胁狩猎由多项活动组成,例如:
威胁狩猎分析人员每天都会结合数据源查询、复杂数据处理、机器学习、威胁情报富化、专有检测逻辑等来创建专有的入侵检测系统(IDS)实例。威胁狩猎分析人员利用脚本语言、电子表格、白板和其他工具来计划和执行他们的威胁狩猎。在传统的网络威胁狩猎中,许多威胁狩猎片段都是针对特定的数据源和数据类型编写的,这使得分析过程中的领域知识不可复用,威胁分析人员需要针对不同的威胁狩猎分析环境,一次又一次地表达相同的分析知识【1】,如下图示例。
在整个威胁狩猎过程可以通过提出和回答两类问题来概括:
任何威胁狩猎活动都涉及这两种类型的问题,并且这两种问题的答案都包含特定领域的知识。然而,这两类问题的领域知识类型并不相同。“What”的答案包含高度创造性、大多是抽象的领域知识,并且在很大程度上可从一种狩猎到另一种狩猎重复使用,而“ How”的答案指导“What”的实现,并从一个狩猎平台替换到另一个狩猎平台。
-
不要用不同的终端检测和响应 (EDR) 查询语言重复编写策略、技术和过程 (TTP) 模式;
-
务必使用通用语言表达所有模式,以便可以将其编译为不同的 EDR 查询以及安全信息和事件管理 (SIEM) API;
-
不要重复编写相关的狩猎步骤,例如在狩猎的不同部分针对各种记录/日志格式获取可疑进程的子进程;
-
以通用方式表达狩猎步骤流程,可以在狩猎的不同部分、甚至不同的狩猎中重复使用和重新执行;
-
不要为实现的特定领域的检测模块或专有检测框重复编写不同的执行环境适配器;
-
使用统一的输入/输出模式执行快速分析,并将现有分析封装起来以可重用的方式运行。
为了不重复工作,我们需要识别和分割所有的狩猎步骤和流程的内容和方式,并分别回答它们 -- 什么将在狩猎的不同部分或不同的狩猎中复用,如何开发不同环境下的实例。
而在传统的威胁狩猎过程中,威胁分析人员要回答狩猎什么和如何狩猎的问题。毫无疑问,安全分析人员的智慧和创造力是提出和回答“狩猎什么”,而“如何狩猎”的问题往往可以通过机器、指令的方式可以快速执行。
这其实就是Kestrel语言和框架的价值。通过统一的语言表达定义、层次化架构设计,屏蔽跨平台、跨语言的分析和查询问题,使得安全分析人员可以专注在“What”上。
Kestrel提供了一个抽象层来规避网络威胁狩猎中的重复工作,如下图所示。
Kestrel威胁狩猎语言和框架,由Kestrel语言和Kestrel Runtime框架两部分内容组成。其中
-
Kestrel 语言:一种威胁狩猎语言,供安全分析人员表达要狩猎的内容。
-
复用和共享个人狩猎步骤、狩猎流程和整个Hunt Book。
-
Kestrel Runtime:一个机器解释器,负责处理如何狩猎。
其中Kestrel Runtime由以下Python包组成:
-
firepit(repo: firepit):Kestrel 内部数据的获取、处理、存储、缓存数据,并将数据与 Kestrel 变量连接;
-
kestrel_datasource_stixbundle(repo: kestrel-lang):用于获取已打包在 STIX 包中的静态遥测数据的数据源接口;
-
kestrel_analytics_python(repo: kestrel-lang):在 Python 中调用分析的分析接口;
-
kestrel_analytics_docker(repo: kestrel-lang):在 docker 容器中执行分析的分析接口;
-
kestrel_jupyter_kernel(repo: kestrel-jupyter):Kestrel Jupyter Notebook 内核,用于在 Jupyter Notebook 中使用 Kestrel;
-
kestrel_ipython(repo: kestrel-jupyter):用于在 iPython 中编写本机 Kestrel 的iPython命令实现。
为了确保Kestrel在威胁狩猎过程中处理不同的数据源,Kestrel接口设计至关重要。
作为一种威胁狩猎语言,Kestrel 旨在访问各种数据源并以各种可能的方式执行封装分析,此外还将检索、转换、丰富和检查狩猎步骤组装到狩猎流程中。换句话说,Kestrel 在其检索狩猎步骤中处理不同的数据源,在其丰富狩猎步骤中处理不同的分析。因此,拥有一个能够适应数据源和分析并可扩展的抽象非常重要【2】。
如上图所示,Kestrel 对数据源和分析使用两级抽象:(i) 数据源或分析接口定义数据源或分析的执行方式,即输入、输出以及执行机制,以及 (ii) 每个数据源或分析都被开发为在一个或多个接口下执行。
Kestrel当前实现了两个数据源接口:STIX-shifter数据源接口和STIX包数据源接口。前者将STIX-shifter作为联合搜索层,通过STIX-shifter 连接器访问 30 多个不同的数据源。后者主要用于演示或开发目的的STIX捆绑包数据。
在实际的威胁狩猎过程中,最好通过STIX-shifter数据源接口使用数据源,以避免重新开发现有的数据管道。作为威胁狩猎分析人员或狩猎平台开发人员,您可以确定要使用的STIX-shifter 连接器并对其进行自定义,例如,根据您的特定数据模型更新映射转换。如果您的数据源不存在STIX-shifter连接器,您可以按照STIX-shifter连接器开发指南,通过向数据源提供API以及与STIX之间的映射以进行转换,从模板创建一个新的连接器。
而在下述情况下,您不需要使用STIX-shifter或STIX-shifter数据源接口:
如果您知道如何从数据源获取STIX观测中的数据,则可以将新数据源添加到STIX捆绑包数据源接口以连接到您的数据源。
如果您不喜欢STIX并且希望直接连接到Kestrel Data Representation,您可以创建一个新的数据源接口来直接将数据提取到firepit(Kestrel数据存储)中,这可以通过创建一个继承AbstractDataSourceInterface类的新Python类来实现。
另外,Kestrel目前实现了两个分析接口:Python分析接口和Docker分析接口。前者将Kestrel分析定义/运行为Python函数,而后者将Kestrel分析定义/运行为Docker容器。
现在在 Kestrel 中创建一些实体来进行测试。
proclist = NEW process [ {"name": "cmd.exe", "pid": "123"}
, {"name": "explorer.exe", "pid": "99"}
, {"name": "firefox.exe", "pid": "201"}
, {"name": "chrome.exe", "pid": "205"}
]
browsers = GET process FROM proclist WHERE [process:name IN ('firefox.exe', 'chrome.exe')]
DISP browsers ATTR name, pid
复制这个简单的狩猎流程程,粘贴到文本编辑器中,然后保存到文件中helloworld.hf。
然后,在终端中使用 Kestrel 命令行执行整个狩猎流程程:
这是Kestrel的批量执行模式。狩猎流程程将作为一个整体执行,所有结果都会在执行结束时打印。
name pid
chrome.exe 205
firefox.exe 201
[SUMMARY] block executed in 1 seconds
VARIABLE TYPE #(ENTITIES) #(RECORDS) process*
proclist process 4 4 0
browsers process 2 2 0
*Number of related records cached.
1.Retrieval检索:获取一组实体。这些实体可以直接从监控器或具有存储监控数据的数据湖检索回来,也可以在从用户到数据源的路径上的任何缓存层中快速获取。
2.Transformation转化:派生出不同形式的实体。在网络流量等基本实体类型中,威胁狩猎分析人员可以执行简单的转换,例如根据其属性对其进行采样或聚合,结果是具有聚合字段的特殊网络流量。
3.Enrichment丰富:向一组实体添加信息。计算一组实体的属性或标签并将它们附加到实体上。这些属性可以是上下文,例如 IP 地址的域名。它们也可以是威胁情报信息,甚至是现有入侵检测系统的检测标签。
4.Inspection检查:显示一组实体的信息。例如,列出一组实体的所有属性和标签,显示一组实体的指定属性的值。
5.Flow-control流控:合并或拆分狩猎流程。例如,合并两个狩猎流程的结果以在后续应用相同的狩猎步骤,或者fork一个狩猎流程分支以开发威胁假设的变体。
简单是Kestrel的设计目标,但Kestrel并没有牺牲威胁狩猎的能力。实现这两者的秘密武器是函数式编程的可组合性思想【3】。
为了自由地组成狩猎流程,Kestrel 定义了一个围绕实体的公共数据模型,即 Kestrel 变量,作为每个狩猎步骤的输入和输出。每个狩猎步骤都会产生一个 Kestrel 变量(或None),它可以是另一个狩猎步骤的输入。除了自由地通过管道传输狩猎步骤来组成狩猎流程之外,Kestrel 还支持狩猎流程分叉和合并:
-
要分叉狩猎流程,只需通过另一个狩猎步骤消耗相同的 Kestrel 变量即可。
-
要合并狩猎流程,只需执行一个采用多个 Kestrel 变量的狩猎步骤即可。
实体定义记录中的对象 。理论上,Kestrel 可以处理数据源提供的任何类型的实体。在实际使用中,用户可以主要使用STIX-shifter数据源接口(第一个支持Kestrel 的数据源接口)来检索数据。stix-shifter是一个联合搜索引擎,具有到各种数据源的stix-shifter 连接器(详情请参考文章“OCA架构系列文章之STIX Shifter详细介绍和分析(二)”)。通过STIX-shifter数据源接口检索到的数据是STIX Observed Data STIX观测数据,其中的实体称为STIX Cyber Observable Objects (SCO),其类型和属性在STIX中正式定义【4】。
请注意,STIX对自定义属性和自定义实体类型都是开放的,并且每个stix-shifter连接器都可以实现标准STIX SCO之外的实体和属性。例如,许多stix-shifter连接器产生OCA/stix-extension中定义的实体,例如x-oca-asset,它是主机/VM/容器/pod 的实体。
以下是使用STIX-shifter数据源接口时的常见实体和属性列表 :
|
|
|
|
|
powershell.exe -Command $Res = 0;
123e4567-e89b-12d3-a456-426614174000
|
|
|
|
|
parent_directory_ref.path
|
fe90a7e910cb3a4739bed918…
a9993e364706816aba3e2571…
912ec803b2ce49e4a541068d…
|
|
|
|
|
|
|
|
|
2001:0db8:85a3:0000:0000:8a2e:0370:7334
|
|
|
|
|
|
|
|
|
https://example.com/research/index.html
|
|
|
|
|
|
|
|
|
HKEY_LOCAL_MACHINESystemFooBar
|
|
|
|
|
|
|
|
|
C=ZA, ST=Western Cape, L=Cape Town …
fe90a7e910cb3a4739bed918…
a9993e364706816aba3e2571…
912ec803b2ce49e4a541068d…
|
|
|
|
扩展中心图形模式(ECGP,Extended Centered Graph Pattern) 描述了一个森林,其中一棵树称为中心子图,其他树称为扩展子图。
ECGP 是STIX 模式的超集,这意味着可以直接在WHERE子句中编写 STIX 模式。ECGP 给出了标准 STIX 模式(又称中心图模式)的语义解释,并在简单性和表现力方面超越了它。接下来的内容将从最简单的形式解释ECGP的全部功能。
Kestrel 实现了基于实体的推理,因此使用 Kestrel 执行的最简单的任务是根据实体的某个属性获取实体。例如,人们可能希望获取某一时间范围内受监视终端终端上执行的所有powershell.exe进程。其模式非常简单:
这称为比较表达式,它由一个属性和指定值组成。在本例中,单一比较表达式构造了这个简单模式 (ECGP)。
假设终端可以由 Kestrel 数据源stixshfiter://edp1指定,时间范围是2022-11-11T15:05:00Z到2022-11-12T08:00:00Z,我们可以将模式放在WHERE命令的子句中,整个GET命令是:
ps = GET process
FROM stixshifter://edp1
WHERE name = "powershell.exe"
START 2022-11-11T15:05:00Z STOP 2022-11-12T08:00:00Z
1.该命令可以用任何缩进样式写成一行或多行。模式本身可以写成一行或多行,也就是说,以下任一内容都是有效的,并且变量ps1和ps2具有相同的实体表达:
ps1 = GET process FROM stixshifter://edp1 WHERE name = "powershell.exe"
START 2022-11-11T15:05:00Z STOP 2022-11-12T08:00:00Z
ps2 = GET process
FROM stixshifter://edp1
WHERE name =
"powershell.exe"
START 2022-11-11T15:05:00Z
STOP 2022-11-12T08:00:00Z
2.可以在字符串文字前后使用单引号或双引号,这种模式是等效的:
name = 'powershell.exe'
name = "powershell.exe"
3.为了兼容 STIX 模式,可以在属性之前指定实体类型,例如entity_type:attribute。对于简单的powershell模式,由于返回实体类型已在之前的GET命令中指定,因此这是冗余且可选的。简而言之,以下命令在ps3中返回的结果与在ps中返回的结果完全相同。
ps3 = GET process
FROM stixshifter://edp1
WHERE process:name = 'powershell.exe'
START 2022-11-11T15:05:00Z STOP 2022-11-12T08:00:00Z
4.为了与 STIX 模式兼容,可以在时间范围规范(START/STOP)之前的WHERE子句中加上方括号 。也就是说,以下命令在ps4中返回的结果与在ps中返回的结果完全相同。
ps4 = GET process
FROM stixshifter://edp1
WHERE [process:name = 'powershell.exe']
START 2022-11-11T15:05:00Z STOP 2022-11-12T08:00:00Z
5.Kestrel 在比较表达式中支持三种类型的值:文字字符串、数字或列表(或嵌套列表)。例如:
-
列为值:name IN ('bash', 'csh', "zsh", 'sh')
-
列表周围的方括号:dst_port IN [80, 443, 8000, 8888]
-
嵌套列表支持(解析后展平):name IN ('bash', ('csh', ('zsh')), "sh")
-
>/>=/</<=:它们将数字<作为<=值来工作。
-
LIKE:后面跟带有通配符%的带引号的字符串(如SQL中所定义)。
-
MATCHES:后面跟带引号的正则表达式(PCRE)字符串。
-
ISSUBSET:仅用于判断 IP 地址/子网是否在子网中,例如ipv4-addr:value ISSUBSET '198.51.100.0/24',STIX模式中的详细信息 。
-
ISSUPERSET:仅用于判断一个 IP 子网是否大于另一个子网/IP,例如ipv4-addr:value ISSUPERSET '198.51.100.0/24',STIX模式中的详细信息。
从指定单一比较表达式升级到在模式中描述返回实体的多个属性,可以使用逻辑运算符AND和OR来组合比较表达式,并使用括号()来提高组合表达式的优先级。
proc1 = GET process
FROM stixshifter://edp1
WHERE name = "powershell.exe" AND pid = 1234
START 2022-11-11T15:05:00Z STOP 2022-11-12T08:00:00Z
netflow1 = GET network-traffic
FROM stixshifter://gateway1
WHERE dst_port = 80 OR dst_port = 443
START 2022-11-11T15:05:00Z STOP 2022-11-12T08:00:00Z
minikatz = GET file
FROM stixshifter://edp1
WHERE name = "C:ProgramDatap.exe"
OR hashes.MD5 IN ( "1a4fe4413a92d478625d97b7df1bd0cf"
, "b6ff8f31007a3629a3c4be8999001ec9"
, "e8994399f1656e58f72443b8861ce5d1"
, "9ae602fddb5d2f9b63c5eb6aad0a2612"
)
START 2022-11-11T15:05:00Z STOP 2022-11-12T08:00:00Z
users = GET user-account
FROM stixshifter://authlogs
WHERE (user_id = 1001 AND account_login = "Tracy")
OR user_id = 0
OR (user_id = 1003 AND is_privileged = true)
OR (account_login = "JJ" AND is_privileged = true)
START 2022-11-11T15:05:00Z STOP 2022-11-12T08:00:00Z
结果是一个具有单个节点 -- 返回实体的图形模式。
在 STIX 中使用引用_ref/_refs[*]可以描述图模式中的边。这将模式从单个节点扩展到具有根的树,该树称为中心子图,根是返回的实体。
上图说明了围绕中心节点C(一个进程)的中心化图模式:
procs = GET process FROM stixshifter:
WHERE name = 'cmd.exe'
AND binary_ref.name MATCHES '.+.(exe|dll|bat)$'
AND opened_connection_refs[*].dst_ref.value = '10.1.1.1'
AND ( ( parent_ref.name = 'explorer.exe' AND
parent_ref.binary_ref.name = 'explorer.exe'
) OR
( parent_ref.name LIKE '%.exe' AND
parent_ref.binary_ref.name != 'powershell.exe'
)
)
START 2022-11-11T15:05:00Z STOP 2022-11-12T08:00:00Z
Kestrel将ECGP与每个记录匹配,检索包含ECGP实例的记录,将ECGP的中心实体返回到Kestrel变量,并将检索到的记录中的所有实体缓存在firepit中(为每个Kestrel会话建立的内存中/磁盘上/远程存储))。
更准确地说,Kestrel从ECGP生成一个STIX 观察表达式,并附加时间范围限定符(START/STOP)以创建一个STIX模式,然后将STIX模式传递给Kestrel数据源接口(例如STIX-shifter数据源接口)进行匹配。
目前,Kestrel生成的一个STIX模式仅包含一个STIX观察表达式,并且仅使用START/STOP限定符。由于一个STIX观察表达式与STIX中的一条记录相匹配 ,因此我们得出本小节开头给出的结论:“Kestrel将ECGP与每条记录进行匹配。”
如果有人在ECGP中描述了一个大的模式,但数据源只有很小的记录怎么办?例如,可以将ECGP编写为具有三个节点的中心子图:中心进程、父进程和祖父进程:
procs = GET process FROM stixshifter:
WHERE name = 'cmd.exe'
AND parent_ref.name = 'explorer.exe'
AND parent_ref.parent_ref.name = 'abc.exe'
如果数据源edp1将记录定义为单独的系统事件或系统调用(一条记录主要有一个进程及其父进程,但没有其祖父进程),则ECGP将不匹配任何内容,因为edp1中没有任何单个记录可以满足大模式。
当我们在传统的基于记录的系统之上运行 Kestrel(基于实体的推理)时,这是一个基本限制。Kestrel Runtime可能会将一个ECGP拆分为多个STIX观察表达式以匹配多个记录,但是:
1.STIX 没有定义记录(STIX观察)的大小/边界,也不知道要拆分多少个 STIX 观察表达式。
2.每个数据源以不同的方式定义记录的大小/边界 ,并且Kestrel并不总是能够很好地记录或通过API检索该定义。
Kestrel官方建议用户编写小型 Kestrel ECGP(单跳半径的子图)来缓解实际使用中的此问题,特别是当用户不知道数据源中的记录有多大时。用户可以显式地将一个大模式拆分成更小的模式(在GET命令中),再加上一些 Kestrel FIND命令来连接它们。每个Kestrel命令(例如GET和FIND)都会生成一个或多个STIX模式并组合结果。
使用图数据库代替基于记录的存储/检索是这个问题的最终解决方案。
模式匹配解释得出的结论是,Kestrel将ECGP与每条记录进行匹配。一方面,记录限制了匹配。另一方面,结果可以提供额外的信息来匹配中心子图 -- 记录中可能存在未连接到中心实体(中心子图的根)的信息,但该信息在查找/匹配中心子图时是有用/辅助的。
由于在基于实体的推理中,一切都是图的一部分,因此辅助信息应该能够表示为子图。现在我们在ECGP中添加扩展子图的概念,因此ECGP被称为扩展中心化图模式。
上图说明了扩展中心化图模式(C是中心子图的根;E是扩展子图的根):
procs = GET process FROM stixshifter:
WHERE name = 'cmd.exe'
AND binary_ref.name MATCHES '.+.(exe|dll|bat)$'
AND opened_connection_refs[*].dst_ref.value = '10.1.1.1'
AND ipv4-addr:value NOT ISSUBSET '192.168.0.0/24'
AND ( ( parent_ref.name = 'explorer.exe' AND
parent_ref.binary_ref.name = 'explorer.exe'
) OR
( parent_ref.name LIKE '%.exe' AND
parent_ref.binary_ref.name != 'powershell.exe'
)
)
AND email-message:from_ref.value = '[email protected]'
START 2022-11-11T15:05:00Z STOP 2022-11-12T08:00:00Z
此模式中的中心子图与中心化图模式中的中心子图相同,而此 ECGP 为匹配指定了额外的约束:任何匹配的记录都应该包含子网192.168.0.0/24中的ipv4-addr和来自[email protected]的电子邮件。扩展子图的三个规则:
1.扩展子图的根实体类型应该与中心子图的根实体类型不同。否则,生成的 STIX 模式会将扩展子图视为中心子图的分支。
2.应指定扩展子图的根实体类型,后跟冒号:,然后是属性、运算符和值。语法与 STIX 路径一致,并且实体类型必须是扩展子图根。
3.扩展子图可以指定在ECGP中的任何地方,这使得编写复杂的逻辑成为可能,例如当中心图是一种形状时使用扩展子图;否则,指定另一个扩展子图或不指定扩展子图来帮助匹配。
上面的例子是一个极端复杂的情况,用于说明 ECGP 中多个不相关的扩展子图。在实际使用中,最常用的扩展子图是主机规范(仅匹配特定主机上的记录),例如:
x-oca-asset:hostname = 'endpoint101'
标准STIX没有用于主机/pod/容器的STIX网络可观察对象(SCO),因此OCA在OCA/stix-extension中提供了自定义的SCO(实体)x-oca-asset 作为STIX扩展,x-oca-asset被大多数stix-shifter连接器支持 。它没有来自标准STIX SCO(实体)的引用,因此它是记录中的孤立子图,扩展子图可以使用此类信息进行模式匹配。
除了静态模式之外,Kestrel还允许引用ECGP中的变量,即可以使用variable.attribute在比较表达式中传递一个值列表 (而不是变量本身,因为比较表达式不接受变量而是值)。这支持使用现有结果快速构建模式,并且支持构建跨数据源搜索的模式。
py1 = GET process FROM stixshifter://edp
WHERE pid = px.pid
py2 = GET process FROM stixshifter://edp
WHERE pid IN px.pid
py3 = GET process FROM stixshifter://edp
WHERE pid IN (123, px.pid, (4, 10548))
当ECGP中使用一个或多个变量引用时,Kestrel 会自动:
1.从匹配/检索的记录中提取实体(在变量中)的时间范围 ;
3.在Configuration中试用timerange_start/stop_offset调整联合时间范围 ;
用户可以通过 在ECGP所在的命令中指定START/STOP来覆盖生成的时间范围,例如GET。
1. 威胁狩猎分析人员正在跟踪跨越两个终端edp1和edp2的横向移动。她已经抓取了edp1上的一堆可疑进程 到Kestrel变量procs1中,并将与进程相关的所有网络流量检索到procs1中。一些网络流量的目标IP与edp2关联,因此她希望跟踪从edp1上的procs1到edp2上的未知进程列表的网络流量,并打印它们的命令行。假设edp1和edp2配置为两个Kestrel数据源,她可以执行以下操作:
procs1 = ...
nt1 = FIND network-traffic CREATED BY procs1
DISP nt1 ATTR src_ref.value, src_port, dst_ref.value, dst_port
nt2 = GET network-traffic
FROM stixshifter://edp2
WHERE src_ref.value = nt1.src_ref.value
AND src_port = nt1.src_port
AND dst_ref.value = nt1.dst_ref.value
AND dst_port = nt1.dst_port
nt2x = GET network-traffic
FROM stixshifter://edp2
WHERE src_ref.value = nt1.src_ref.value
procs2 = FIND process CREATED nt2
DISP procs2 ATTR command_line
2. 终端edp通过代理服务器pxy访问互联网。Kestrel数据源stixshifter://edp是edp上EDR,另一个Kestrel数据源stixshifter://pxy管理代理日志。由于所有网络流量都是代理的,所以观察到的网络流量edp都有远程IP作为代理服务器,但不是真正的远程IP。为了获取真实的远程IP并运行Kestrel分析以使用某些威胁情报来丰富IP,威胁狩猎分析人员需要首先关联数据:
nt_inner = ...
nt_outter = GET network-traffic
FROM stixshifter://pxy
WHERE src_ref.value = nt_inner.src_ref.value
AND src_port = nt_inner.src_port
DISP nt_outter ATTR dst_ref.value, dst_port
APPLY python://xfeipenrich ON nt_outter
比较表达式中的Kestrel字符串文字类似于标准的Python字符串。它支持特殊字符的转义,例如,n意味着换行。
字符串文字可以用匹配的单引号 ( ' )或双引号( " )括起来。反斜杠( \ )字符用于转义具有特殊含义的字符,例如换行符、反斜杠本身或引号字符。
pe1 = GET process FROM stixshifter://edp1
WHERE command_line = "powershell.exe "yes args""
pe2 = GET process FROM stixshifter://edp1
WHERE command_line = 'powershell.exe "yes args"'
pe3 = GET process FROM stixshifter://edp1
WHERE command_line = "powershell.exe 'yes args'"
pe4 = GET process FROM stixshifter://edp1
WHERE command_line = "C:\Windows\System32\cmd.exe"
ps5 = GET process FROM stixshifter://edp1
WHERE name MATCHES 'cmd\.exe'
ps5 = GET process FROM stixshifter://edp1
WHERE name MATCHES '\w+\.exe'
转义后的字符串对正则表达式的使用并不友好,导致需要编写四个反斜杠\\来表示单个精确的反斜杠字符,例如,STIX模式需要"[artifact:payload_bin MATCHES 'C:\\Windows\\system32\\svchost\.exe']"表示原始路径C:Windowssystem32svchost.exe 。
为了克服这种不便,Kestrel像Python一样提供原始字符串,这意味着Kestrel原始字符串中没有转义字符(解释原始字符串时不进行转义计算)。
f1 = GET file FROM stixshifter://edp1
WHERE name MATCHES 'C:\\Windows\\System32\\cmd\.exe'
f2 = GET file FROM stixshifter://edp1
WHERE name MATCHES r'C:\Windows\System32\cmd.exe'
f3 = GET file FROM stixshifter://edp1
WHERE name = 'C:\Windows\System32\cmd.exe'
f4 = GET file FROM stixshifter://edp1
WHERE name = r'C:WindowsSystem32cmd.exe'
Kestrel命令描述了狩猎步骤分为以下五类【5】:
1.Retrieval检索:GET,FIND,NEW;
2.Transformation转换:SORT,GROUP;
4.Inspection检查:INFO,DISP,DESCRIBE;
5.Flow-control流控制:SAVE,LOAD,ASSIGN,MERGE,JOIN。
为了实现可组合的狩猎流程,并允许威胁狩猎分析人员自由地组成狩猎流程,任何Kestrel命令的输入和输出定义如下:
一个命令接受一个或多个变量,可能还包括一些元数据,例如数据源的路径、要显示的属性或分析参数。然后,该命令可以不产生任何结果、不生成一个变量、不显示一个对象,或者既生成一个变量又生成一个显示对象。
-
如可组合的狩猎流程所示,命令使用和产生的Kestrel变量在将不同的狩猎步骤(命令)连接到狩猎流程中发挥着关键作用。
-
显示对象是由Kestrel前端显示的东西,例如Jupyter Notebook,它不会被任何以下狩猎步骤消耗。它仅向用户呈现狩猎步骤中的信息,例如变量中实体的表格显示,或实体的交互式可视化。
命令GET是一个检索狩猎步骤,用于将图形模式和匹配中定义的扩展中心图形模式 (ECGP)与实体池进行匹配,并返回同构实体(池中满足该模式的实体子集)的列表。
returned_variable = GET returned_entity_type [FROM entity_pool] WHERE ecgp [time_range] [LIMIT limit]
-
returned_entity_type是在关键字GET之后指定的;
-
该池可以是数据源,在该数据源中产生/存储的记录中具有不同类型的实体。例如,数据源可以是存储监控日志的数据湖、EDR、防火墙、IDS、代理服务器或SIEM系统。entity_pool是数据源的标识符,例如:
-
stixshifter://host101:通过STIX-shifter数据源接口在host101上的EDR;
-
https://a.com/b.json:STIX捆绑包中的遥测数据。
-
池也可以是现有的Kestrel变量(该变量中所有相同类型的实体)。在这种情况下,entity_pool是变量名称。
一般情况下,GET命令需要FROM子句。有一个例外:Kestrel Runtime会记住在狩猎会话GET命令中使用的最后一个数据源。如果会话中已经执行了数据源(不是变量)为entity_pool的GET命令,并且用户希望使用相同的数据源编写新的GET命令,则可以省略FROM子句。
-
time_range在时间范围中描述,可以使用的绝对和相对时间范围语法,这是可选的,Kestrel将尝试按以下顺序为模式指定时间范围(数字越小,优先级越高):
1.使用时间范围语法(如果提供)的用户指定的时间范围。
2.ECGP中Kestrel变量的时间范围(如果存在)。
3.STIX-shifter连接器默认时间范围, 例如最后五分钟。
-
limit是一个可选参数,用于指定GET查询要返回的记录数。在当前的实现中,Kestrel将返回有限的observed-data记录。返回的returned_entity_type记录数 可能不同,因为这取决于observed-data数据集中包含的returned_entity_type记录数。
procs = GET process FROM stixshifter://host101 WHERE parent_ref.name = 'abc.exe'
START 2021-05-06T00:00:00Z STOP 2021-05-07T00:00:00Z
binx = GET file
FROM https://a.com/b.json
WHERE hashes.MD5 = 'dbfcdd3a1ef5186a3e098332b499070a'
START 2021-05-06T00:00:00Z STOP 2021-05-07T00:00:00Z
procs2 = GET process FROM procs WHERE pid = 10578 AND name = 'xyz'
procs3 = GET process FROM procs WHERE pid = procs2.pid
procs4 = GET process WHERE pid = 1234
START 2021-05-06T00:00:00Z STOP 2021-05-07T00:00:00Z
命令FIND是检索狩猎步骤,用于返回连接到给定实体列表的实体。
returned_variable = FIND returned_entity_type RELATIONFROM input_variable [WHERE ecgp] [time_range] [LIMIT limit]
Kestrel 定义了两类关系:5种特定关系和1种通用关系。特定关系是有向的,一般关系是无向的。详细如图:
Kestrel关系主要基于标准STIX数据模型,例如STIX 2.0中的_ref和STIX 2.1中的SRO。虽然STIX是可扩展的,并且数据源可以带来自己的自定义关系映射,但 Kestrel仅实现标准STIX中支持的关系以确保其通用性。好的部分是,这会自动适用于所有stix-shifter连接器,这些连接器大多遵循STIX标准。不好的部分是,标准STIX没有定义进程的文件读/写/创建/删除等操作,所以目前缺少这些特定的关系,用户可以使用泛型关系来查找相关实体的超集作为部分解决方案。
parent_procs = FIND process CREATED procs
parent_procs = FIND process CREATED BY procs
nt = FIND network-traffic CREATED BY procs
ntprocs = FIND process CREATED network-traffic
src_ip = FIND ipv4-addr CREATED nt
src_ip = FIND ipv4-addr ACCEPTED nt
src_ip = FIND ipv4-addr LINKED nt
ntspecial = FIND network-traffic CREATED BY src_ip
同时,支持Limited ECGP in FIND的特殊表达如下:
FIND中的WHERE子句是一个可选组件,用于在生成对数据源的低级查询时添加约束,与GET命令类似,在FIND的WHERE的子句中使用ECGP。但是,只需将扩展子图组件写在 ECGP 中即可。如果ECGP中存在中心子图组件,则在Kestrel生成低级查询时,它将被丢弃/放弃。设计原理:
1.在GET中,WHERE子句是唯一描述返回变量约束的位置;
2.在FIND中,返回变量的主要约束已经由关系提供。通过给定关系从输入变量连接的返回变量本质上是一跳中心子图;
3.如果ECGP具有中心子图组件,则它可能与第二点中生成的一跳中心子图冲突。因此,Kestrel算法在查找过程中如果ECGP中存在中心子图,则丢弃该中心子图。
4.由于扩展子图不与FIND中的关系相冲突,并且它可以提供额外的约束以避免不必要的计算/传输,因此它被包含在对数据源生成的低级查询中。
以下示例是一个完全有效的、带有ECGP的FIND:
parent_procs_ww = FIND process CREATED procs
WHERE x-oca-asset:hostname = 'endpoint101'
如果用户编写以下内容,实际上结果与上面的示例相同:
parent_procs_ww2 = FIND process CREATED procs
WHERE name = 'bash' AND x-oca-asset:hostname = 'endpoint101'
如果用户想要匹配仅有bash的父进程,他/她需要一个两步的Huntflow狩猎流程:
parent_procs_ww = FIND process CREATED procs
WHERE x-oca-asset:hostname = 'endpoint101'
parent_procs_bash = parent_procs_ww WHERE name = 'bash'
另外,支持Time Range in FIND的特殊表达如下:
time_range可选的,Kestrel将从input_variable中推断出时间范围,类似于引用ECGP中的变量中的时间推断。仅当用户想要覆盖从input_variable推断的时间范围时,才需要提供时间范围。
覆盖时间范围示例:服务进程在主机上运行了几天。进程创建/分叉的记录发生在第1天,而其大部分活动发生在第4-5天。对流程的狩猎开始于第4-5天,其中包含一些GET。当威胁狩猎分析人员想要FIND服务进程的父进程时,如果他/她没有指定时间范围(进程创建记录在推断的时间范围:第4-5天),他/她将什么都检索不到。威胁狩猎分析人员可以通过指定的时间范围来扩展和覆盖FIND命令中的时间范围,以最终检索父进程。没有人(威胁狩猎分析人员或 Kestrel)知道进程何时创建/分叉,因此在威胁狩猎分析人员将时间范围扩大到足够大的时间范围以FIND检索父进程之前,可能需要进行一些尝试和错误。狩猎手册的草图:
nt = GET network-traffic
FROM stixshifter://edp
WHERE dst_ref.value = '10.10.30.1'
LAST 5 DAY
p1 = FIND process CREATED nt
pp1 = FIND process CREATED p1
p2 = FIND process CREATED nt LAST 10 DAY
pp2 = FIND process CREATED p2
再有,支持Limit in FIND的特殊表达如下:
limit是一个可选参数,用于指定要由FIND查询返回的记录数。在当前的实现中,Kestrel将返回有限的observed-data记录。返回的 returned_entity_type记录数可能会有所不同,因为这取决于observed-data数据集中包含的returned_entity_type记录数。
最后,支持的Relation With GET的特殊表达如下:
FIND和GET都是检索狩猎步骤。GET是最基本的检索狩猎步骤,而FIND提供了一个抽象层来检索连接的实体,比使用原始的GET更容易,也就是说,理论上只要有一些关于如何搜索的知识,就可以用GET来代替FIND。
理论上,当知道底层记录如何连接时,可以用GET和参数化的STIX模式来替换。实际上,使用GET中的STIX模式是不可能做到这一点的。
命令NEW是一个特殊的检索狩猎步骤,用于直接从给定数据创建实体。
returned_variable = NEW [returned_entity_type] data
-
字符串[str]的列表。如果使用了该属性,则returned_entity_type是必需的。Kestrel Runtime根据返回类型创建实体列表,每个实体都有一个初始属性;
-
字典列表[{str: str}]。所有字典应该共享相同的键集,它们是实体的属性。如果未将type作为键提供,则需要return_entity_type。
给定的数据应该遵循JSON格式,例如,使用双引号括起字符串。这与STIX模式中的字符串不同,后者由单引号括起来。
newprocs = NEW process ["cmd.exe", "explorer.exe", "google-chrome.exe"]
newvar = NEW [ {"type": "process", "name": "cmd.exe", "pid": "123"}
, {"type": "process", "name": "explorer.exe", "pid": "99"}
]
newvar2 = NEW process [ {"name": "abc.exe", "pid": "1234"}
, {"name": "ie.exe", "pid": "10"}
]
命令SORT是一个转换狩猎步骤,用于对Kestrel变量中的实体重新排序,并将具有新顺序的同一组实体输出到新变量。虽然在显示时,DISP中的SORT子句只改变实体的顺序一次,但SORT命令会在会话的存储区中重新排序实体(在变量中),因此使用该变量的所有后续命令将以更新后的顺序查看实体。大多数Kestrel命令是顺序不敏感的,但是可以开发实体顺序敏感的分析并通过应用调用。
newvar = SORT varx BY attribute [ASC|DESC]
-
attribute是属性名称,类似于pid或x_suspicious_score(如果varx是进程,则在运行可疑进程评分分析后) ;
-
默认情况下,数据将按降序排序。用户可以显式指定排序方向,例如ASC:升序。
nt = GET network-traffic FROM stixshifter://idsX WHERE dst_ref_value = '1.2.3.4'
ntx = SORT nt BY dst_port ASC
DISP ntx ATTR dst_port
命令GROUP是一个转换狩猎步骤,用于根据一个或多个属性对实体进行分组,并计算聚合实体的聚合属性。
aggr_var = GROUP varx BY attr1, attr2... [WITH aggr_fun(attr3) [AS alias], ...]
aggr_var = GROUP varx BY BIN(attr, bin_size [time unit])... [WITH aggr_fun(attr3) [AS alias], ...]
数字和时间戳属性可以使用BIN函数进行“装箱”或“装桶”。这个函数接受2个参数:一个属性,和一个整数bin大小。对于时间戳属性,bin大小可以包括单位。
如果未指定聚合函数,将自动选择它们。在这种情况下,返回实体的属性将用unique_前缀修饰,例如而unique_pid 而不是pid。
当不指定alias聚合时,聚合属性将以聚合函数为前缀,例如min_first_observed。
procs = GET process FROM stixshifter://edrA WHERE parent_ref.name = 'bash'
aggr = GROUP procs BY name
DISP aggr ATTR unique_name, unique_pid, unique_command_line
conns = GET network-traffic FROM stixshifter://my_ndr WHERE src_ref.value LIKE '%'
conns_ts = TIMESTAMPED(conns)
conns_binned = GROUP conns_ts BY BIN(first_observed, 5m) WITH COUNT(src_port) AS count
命令APPLY是一个丰富狩猎步骤,用于计算属性并将其添加到 Kestrel 变量,以及生成可视化对象,这称为化富,因为外部计算的结果作为返回实体的新的/更新的属性合并回搜索流。外部计算(又名 Kestrel 中的分析)可以执行检测、威胁情报富化、异常检测、聚类、可视化或任何语言的计算。这种机制使该APPLY命令成为 Kestrel 的外语接口。
APPLY analytics_identifier ON var1, var2, ... WITH x=abc, y=[1,2,3], z=varx.pid
-
输入:该命令接受一个或多个Kestrel变量,例如var1, var2。
-
值可以是文字字符串、带引号的字符串(带有转义字符)、列表或嵌套列表。
-
值可以包含对Kestrel变量的引用。与ECGP中的变量引用一样,引用Kestrel变量时需要指定实体的属性。Kestrel将取消引用属性/变量,例如,z=varx.pid将枚举变量varx的所有pid,这些pid变量可以展开为[4, 108, 8716],最后一个参数在传递给分析器时是z=[4, 108, 8716]。
-
执行:该命令会像docker://ip_domain_map或python://pin_ip_on_map那样执行由analytics_identifier指定的分析。
除了相应的 Kestrel 分析接口指定的输入和输出之外,分析器可以执行的操作没有任何限制 。分析可以完全在本地运行,然后只进行表查找。它可以像 VirusTotal 服务一样连接到互联网,它可以对二进制样本进行实时行为分析。基于特定的分析接口,一些分析可以完全在云端运行,并且该接口将结果收集到本地 Kestrel Runtime。
威胁狩猎分析人员可以快速将现有的安全程序/模块包装到Kestrel分析中。例如,将Kestrel分析创建为docker容器并利用现有的Kestrel Docker分析接口(Docker Analytics Interface)。您还可以轻松地开发新的分析接口以提供特殊的运行环境。
-
输出:执行的分析可能会产生(a)变量更新的数据或(b)显示对象中的一个或两个。应用命令将影响传递给Kestrel会话:
-
更新变量:最常见的填充是向输入变量(现有实体)添加/更新属性。这些属性可以是,但不限于:
-
检测结果:分析对给定的实体进行威胁检测。结果可以是任何标量值,如字符串、整数或浮点数。例如,恶意软件标签及其系列可以是字符串,可疑分数可以是整数,可能性可以是浮点数。数值数据可以被后来的Kestrel命令使用,例如,SOD。任何新属性都可以在以下GET命令的WHERE子句中使用,以选取实体的子集。
-
威胁情报(TI)信息:通常称为TI信息,例如IoC(Indicator of Comprise)标签。
-
通用信息:分析可以添加非TI特定的通用信息,例如根据软件实体的名称属性将软件描述作为新属性添加到软件实体中。
-
Kestrel 显示对象:分析也可以生成一个显示对象,供前端显示。可视化分析会产生这样的数据,例如我们的python://pin_ip_on_map分析,它查找网络流量或ipv4-addr实体中IP地址的地理位置,并将它们固定在地图上,这可以在Jupyter Notebooks中显示。
-
命令INFO是一个检查狩猎步骤,用于显示Kestrel变量的详细信息。
这些属性名对于用户使用ATTR子句来构造DISP命令特别有用。
nt = GET network-traffic FROM stixshifter://idsX WHERE dst_port = 80
INFO nt
该命令DISP是打印Kestrel变量中实体的属性值的检查步骤。命令向前端返回一个表格显示对象,例如Jupyter Notebook。
DISP [TIMESTAMPED(varx)|varx]
[WHERE ecgp]
[ATTR attribute1, attribute2, ...]
[SORT BY attibute [ASC|DESC]]
[LIMIT l [OFFSET n]]
-
可选的转换TIMETAMPED检索varx中每个实体的每个观察的first_observe时间戳。
-
可选子句WHERE指定一个ECGP(在Graph Pattern和Matching中定义)作为过滤器。只有ECGP的居中子图组件(而非扩展子图)将被处理用于DISTANCE命令。
-
可选子句ATTR指定要打印的属性列表。如果省略,Kestrel将输出所有属性。
-
可选子句SORT BY指定要使用哪个属性来排序要打印的实体。
-
该命令将删除重复的行。显示对象中的所有行都是不同的。
-
该命令将遍历本地存储中关于变量中的实体的所有记录/日志。有些记录可能缺少其他记录具有的属性,并且经常会看到打印表中的空字段。
-
如果对数据不熟悉,可以使用INFO列出所有的属性,然后选取一些属性来编写DIS命令和ATTR子句。
nt = GET network-traffic FROM stixshifter://idsX WHERE dst_port = 80
DISP nt ATTR src_ref.value, src_port, dst_ref.value, dst_port
procs = GET process FROM stixshifter://edrA WHERE parent_ref.name = 'bash'
DISP procs ATTR pid, name, command_line
DISP TIMESTAMPED(procs) ATTR pid, name, command_line
命令DESCRIBE是一个检查狩猎步骤,用于显示Kestrel变量属性的描述性统计信息。
nt = GET network-traffic FROM stixshifter://idsX WHERE dst_port = 80
DESCRIBE nt.src_port
命令SAVE是将Kestrel变量转储到本地文件的流控制搜索步骤。
-
输入变量(数据表)中实体的所有记录都会被打包到输出文件中。
-
.parquet.gz:gzip 压缩的parquet文件。
-
将 Kestrel 变量保存到文件中以进行分析开发非常有用。Docker Analytics Interface 实际上做了同样的事情,为Docker容器准备输入。
procs = GET process FROM stixshifter://edrA WHERE parent_ref.name = 'bash'
SAVE procs TO /tmp/kestrel_procs.parquet.gz
命令LOAD是一个流控制搜索步骤,用于将数据从磁盘加载到Kestrel变量中。
newvar = LOAD file_path [AS entity_type]
-
.parquet.gz:gzip 压缩的parquet文件。
-
该命令加载相同类型实体的记录。如果数据中没有类型列,则应在AS子句中指定返回的实体类型。
-
使用SAVE和LOAD,你可以在寻线之间传送数据。
-
用户可以将外部威胁情报 (TI) 记录LOAD到Kestrel变量中。
procs = GET process FROM stixshifter://edrA WHERE parent_ref.name = 'bash'
SAVE procs TO /tmp/kestrel_procs.parquet.gz
pload = LOAD /tmp/kestrel_procs.parquet.gz
susp_ips = LOAD /tmp/suspicious_ips.csv AS ipv4-addr
nt = GET network-traffic
FROM stixshifter://idsX
WHERE dst_ref.value = susp_ips.value
命令ASSIGN是一个流控制狩猎步骤,用于将数据从一个变量复制到另一个变量。
newvar = oldvar
newvar = TIMESTAMPED(oldvar)
newvar = oldvar [WHERE ecgp] [ATTR attr1,...] [SORT BY attr] [LIMIT n [OFFSET m]]
-
在第二种形式中,newver比oldvar多了一个first_observed属性。
-
在第三种形式中,oldvar将被过滤,结果赋值给newvar。
-
WHERE中的ecgp为Graph Pattern and Matching中定义的ECGP。对于ASSIGN命令,仅处理ECGP的居中子图组件(非扩展子图)。
-
attr和attr1是在实体和变量中定义的实体属性。
copy_of_procs = procs
ssh_conns = conns WHERE dst_port = 22
ts_urls = TIMESTAMPED(urls)
wmic_procs = TIMESTAMPED(procs) WHERE command_line LIKE '%wmic%'
p2 = procs WHERE pid IN (4, 198, 2874)
p3 = procs WHERE pid = p2.pid
p4 = procs WHERE pid IN (p2.pid, 8888, 10002)
p5 = procs WHERE pid = p2.pid AND name = "explorer.exe"
该命令MERGE是一个流控制狩猎步骤,用于合并多个变量中的实体。
merged_var = var1 + var2 + var3 + ...
procsA = GET process FROM stixshifter://edrA WHERE parent_ref.name = 'bash'
procsB = GET process FROM stixshifter://edrA WHERE binary_ref.name = 'sudo'
procs = procsA + procsB
APPLY docker://susp_proc_scoring ON procs
命令JOIN是一个高级流控制狩猎步骤,它直接作用于实体记录,以实现全面的实体连接发现。
newvar = JOIN varA, varB BY attribute1, attribute2
该命令接受两个Kestrel变量和每个变量的一个属性。它根据两个变量的连接属性对这两个变量的所有记录执行内部连接。
-
该命令保留varA中的所有属性,并从varB添加属性(如果varA中不存在)。
procsA = GET process FROM stixshifter://edrA WHERE name = 'bash'
procsB = GET process WHERE binary_ref.name = 'sudo'
procsC = JOIN procsA, procsB BY pid, parent_ref.pid
procsD = FIND process CREATED procsB
procsE = GET process FROM procsD WHERE pid = procsA.pid
Kestrel提供了一个抽象层,用于使用跨多个数据源和数据类型运行的标准搜索步骤来组合搜索流【6】。
在狩猎流程中的不同狩猎步骤/命令中(如下图所示),检索步骤(例如GET)是将数据从数据源拉到Kestrel内部数据存储(firepit)的步骤。在检索步骤的执行过程中,数据被传输、转换和接收到内部数据存储中。
Kestrel内部数据存储在关系数据库之上实现了数据的实体关系视图 -- 每种类型的实体驻留在一个表中,实体之间的关系驻留在另一个表中。Kestrel变量被实现为实体表的视图。大多数搜索步骤(如转换和检查)是只读或投影步骤,除了检索和富集步骤(Kestrel分析)可以在表中写入新的行/列,甚至创建表,例如,从检索中创建新实体。
在现实世界的威胁狩猎中,数据检索(上图)是通过当前Kestrel实现中的stix-shifter完成的,这使得安全分析人员能够连接到30多个数据源(EDR、SIEM、日志管理系统、数据湖等)。在上图中,底部三个方框链接之前的大多数阶段都是轻量且快速的,而最耗时的阶段是底部的三个方框:
-
传输 Transmission:I/O 绑定阶段,将数据从数据源传输回 Kestrel Runtime,该Runtime可以在威胁狩猎分析人员的笔记本电脑上、本地狩猎服务器上运行,或者作为云中专用于狩猎的容器运行。
-
转换 Translation:将从数据源传回的原始数据转换为标准/规范化格式,如STIX(文本中的JSON)或Pandas DataFrame的CPU绑定阶段。
-
摄取 Ingestion:CPU和I/O绑定的阶段,从归一化的数据中执行实体/关系识别,并将实体和关系摄取到Kestrel内部数据存储中。
实际上,第一个复杂之处是分页 -- 大数据主干可能无法一次检索 -- 许多数据源(例如Elasticsearch)为大数据提供多页或多轮检索。例如,Elasticsearch的默认页面大小为10,000,任何大于该大小的数据量都应使用search_after API分多轮获取。
在Kestrel 1.5.8中,我们在stix-shifter数据源接口中添加了分页支持,如下图所示。
在Kestrel 1.5.10中,我们实现了快速转换,作为stix-shifter结果转换的替代方案(如下图所示)。如果没有快速转换,stix-shifter会在摄取之前将原始数据从数据源转换为JSON(文本)形式的STIX。启用快速转换(可在stixshifter.yaml中配置)后,原始数据将转换为Pandas DataFrame(Python 原生数据结构),其性能优于JSON。
在Kestrel 1.6.0中,我们在传输和转换/摄取之间实现了异步生产者-消费者模型,以利用 stix-shifter v5 中的异步支持。不幸的是,这并没有像预期那样提高性能,因为CPU绑定的转换(在stix-shifter v5异步模式下实现)总是超时I/O绑定的传输,并在转换完成后重新启动传输。
在Kestrel 1.7.0中,我们跳过asyncio并转向多处理以处理I/O绑定和CPU绑定阶段。下图说明了多进程的数据检索过程。关键操作如下:
1.KestrelRuntime建立一个转换工作进程池(工作进程的数量可在中配置stixshifter.yaml)。
2.Kestrel调用stix-shifter将STIX模式转换为n个原生数据源查询。
3.Kestrel启动n个传输工作进程,每个工作进程都与数据源联系以执行一个本机查询并将结果取回。同一查询结果的多个页面(如果存在)将由同一个传输工作者逐个检索回来。
4.来自每个传输工作进程的每页/批次原始记录(传输结果)在到达时都会被推送到传输转换队列。
5.传输转换队列中的每批原始记录都由任何免费转换工作者拾取,使用stix-shifter转换 (JSON) 或firepit快速转换(DataFrame)进行转换,然后推送到转换接收队列。
6.Kestrel主进程从转换接收队列中获取任何转换结果并将其注入到firepit中。我们有意在主进程中序列化这个摄取阶段,以避免SQLite(默认的firepit后端)出现并行问题。
本部分内容介绍将一种横向移动监测的方法,并展示如何在Kestrel威胁狩猎平台中使用STIX-Shifter读取数据。
横向移动是指 MITRE ATT&CK Enterprise矩阵中的一种战术,从字面上看,它是指攻击者在初次访问后通过企业进行的移动。远程连接在横向移动中发挥着重要作用,我们的方法旨在检测攻击者在目标网络中横向移动的远程认证请求。
这是一种半监督异常检测方法,用于检测横向移动中发出的恶意身份验证请求。每个身份验证请求都涉及三个实体:用户、发出请求的源主机和用户请求访问的目的主机。请注意,源和目标可以相同或不同。该方法将良性认证请求建模为用户簇、源簇和目的地簇之间的连接,表示为(Cu, Cs, Cd),其中Cu、Cs和Cd分别表示用户、源和目的地的簇。
我们的方法使用良性身份验证请求作为训练数据,并将主机作为身份验证请求的来源,主机作为请求的目的地,以及用户分别进行聚类。此聚类的目标是将在相关身份验证请求中观察到的行为方面相似的实体放在一个簇中。在聚类之后,我们有许多专门用于用户、源或目的地的簇。训练继续通过迭代训练数据来推导簇间的连接。如果训练数据包括属于簇Cu的用户从属于簇Cs的主机发出的访问簇Cd中的主机的良性认证请求,则这些簇之间的连接导出为(Cu, Cs, Cd)。该方法基于训练期间导出的簇间连接进行预测。要根据良性或恶意预测请求的状态,首先要找到源、目标和用户所属的簇。然后,如果这些簇之间存在连接,则该请求被认为是良性的,否则被标记为恶意的。该方法将聚类的数量作为输入。
该方法是半监督的,应该使用一些关于一组良性请求的信息进行训练。需要以下有关身份验证请求的信息来训练我们的分类器并检测横向移动:
该信息可以表示为 STIX Cyber-Observable (SCO),如下所示。
{
"id": "observed-data--10fb1d74-41f9-4761-a72a-0002",
"type": "observed-data",
"spec_version": "2.1",
"created": "2022-12-08T00:03:19",
"modified": "2022-12-08T00:03:19",
"objects": {
"0": {
"type": "ipv4-addr",
"value": "0.0.4.66"
},
"1": {
"type": "ipv4-addr",
"value": "0.0.4.68"
},
"2": {
"type": "user-account",
"user_id": "C466@DOM1",
"status": "benign"
},
"3": {
"type": "network-traffic",
"src_ref": "0",
"dst_ref": "1"
}
},
"first_observed": "2022-10-05T00:03:19.000Z",
"last_observed": "2022-10-05T00:03:19.000Z",
"number_observed":1
}
我们假设所有请求的信息已经存储在mysql数据库中的一个表中,按照时间升序排列。该表如下图所示。第一列以时间戳格式显示生成身份验证请求的时间。第二列显示用户名和域。源主机和目的主机的IP地址存储在第三列和第四列中。最后,“状态”列显示身份验证请求的状态。此列的值为“良性”或“未知”。良性请求将用于训练方法,而具有未知状态的请求将被测试。
接下来介绍如何使用Kestrel分析,首先是下载并构建。
我们的 Kestrel 分析可在开放网络安全联盟的GitHub存储库中找到。要使用此Kestrel分析,您需要下载并构建它。下载和构建 Kestrel 分析所需的命令如下所示。
$ git clone https://github.com/opencybersecurityalliance/kestrel-analytics/lateral-movement-detection.git
$ docker build -t kestrel-analytics-detect_lateral_movement
有人可能会想到从数据库读取数据的一种简单方法是observed-data使用GET命令按类型检索它们。但是,此命令不返回网络流量,也不返回封装在观察数据对象中的用户帐户对象。
observations = GET observed-data FROM stixshifter://database WHERE [user-account:user_id != null]
DISP observations LIMIT 10
因此,我们必须以不同的方式检索用户帐户和网络流量对象。首先,我们使用GET命令检索用户帐户对象。由于该WHERE子句是强制性的,因此我们只需放置一个简单的STIX模式,该模式始终为true,如user_id != null。然后,我们使用FIND命令检索链接到用户帐户对象的网络流量对象。
users=GET user-account FROM stixshifter://database WHERE [user-account:user_id != null]
connections=FIND network-traffic LINKED users
该数据库可能包含用户发出的多个身份验证请求,或具有相同源和目标对的多个身份验证请求。但是,对于身份验证请求中涉及的每个源和目标对或用户,GET和FIND命令仅检索一个对象。为了解决这个困难,我们在变量users和connection上应用ADDOBSID转换器。此转换器返回一个表,其中包括所有用户帐户或网络流量对象的属性以及相应SDO的标识符。
connections_obs = ADDOBSID (connections)users_obs = ADDOBSID (users)
使用DISP命令,您可以看到由ADDOBSID转换器返回的表。
我们可以在表users_obs和connection_obs上应用Kestrel分析和变量观察来检测横向移动。不幸的是,表users_obs和connection_obs是不可更改的,因为ADDOBSID转换器实际上创建了一个视图。因此,我们需要将变量观测发送到Kestrel分析器,以将输出(检测结果)记录为变量观测中的自定义属性。我们向Kestrel分析器传递三个参数,分别称为“ku”、“ks”和“kd”,分别表示用户、源和目标的群集数量。
APPLY docker://detect_lateral_movement ON observations, users_obs, connections_obs WITH ku=60, ks=60, kd=60
我们的Kestrel分析通过添加更多表示源和目标主机、用户名、更新状态和簇标识符的列来修改变量观察。这些新的栏目如下图所示。请注意,列c_usr、c_src和c_dst分别表示用户、源主机和目标主机所属的簇的标识符。簇标识符的值-1表示用户或主机不属于任何簇,因为用户或主机未参与任何训练请求。如果列状态的值是“恶意的”,则认证请求被识别为横向移动。
DISP observations ATTR id, created, first_observed, source, destination, username, status LIMIT 10
Kestrel团队制作了两个入口URL,其中一个将在公有云中启动一个Kestrel沙盒实例,并在进入沙盒时打开kestrel-huntbook repo中的教程或huntbooks目录。:
以第一个入口URL为例,点击后,需要几秒钟/分钟的时间准备沙箱。当新版本第一次运行时,它可能会很慢,在后续运行中会变得更快。
然后,沙箱将启动,用户将登陆到经典的Jupyter Notebook Web界面(在幕后,沙箱是与Binder合作的在公共云中运行的容器)。沙箱安装并设置了Jupyter、Kestrel Runtime、Kestrel内核、Kestrel分析和Kestrel狩猎手册,因此用户已准备好在沙箱中探索和执行狩猎手册。
例如,用户可以单击Huntbook 5。应用Kestrel Analytics.ipynb并使用Kestrel Jupyter内核打开Huntbook。狩猎手册现在已准备好查看/编辑/执行。
人们使用 Kestrel 沙箱来学习 Kestrel、调整现有的狩猎手册、使用它们并检查执行结果。Kestrel教程中的狩猎使用开源Kestrel项目提供的预装数据(STIX捆绑包)。
除了玩狩猎手册之外,人们还可以直接在沙盒中进行狩猎。启动Kestrel沙箱后,可以通过使用文本编辑器创建名为stixshifter.yaml的stix-shifter接口配置文件来连接自己的数据源。
现在,stixshifter.yaml文件所在目录中的任何huntbook都可以使用stixshifter.yaml中定义的数据源。
要开始狩猎,请单击右上角的“新建”按钮,然后选择Kestrel作为笔记本内核。在笔记本中写入newvar = GET ipv4-addr FROM stixshifter://并按Tab键可列出(自动完成)stixshifter.yaml文件中的所有数据源。
本示例介绍了一种新的Kestrel分析方法,它使用图学习来检测横向移动。
图学习是指分析图以学习图的结构,特别是节点嵌入。节点的嵌入是一个数值向量,并被视为表示该节点的特征向量。要检测恶意远程身份验证请求,必须分析用户或主机参与的身份验证事件的模式。例如,考虑请求访问主机的用户集合或从主机请求的主机集合是很重要的。这些信息可以用图表示,定义如下:
-
对于每个用户或主机,都有一个代表该用户或主机的节点。
-
如果用户提交了访问主机的身份验证请求,则存在一条从代表用户的节点指向代表该主机的节点的边。
-
如果用户从主机提交了身份验证请求,则存在从代表该主机的节点指向代表用户的节点的边。
例如,假设有一个网络,其中有五台主机(名为C17693、C151、C1495、C1521和C305)以及三个用户(用户名分别为U748、U636和U1723) 。下图表示基于以下身份验证请求构建的图,其中,基于三个良性身份验证请求和一个恶意请求构建的小图。与恶意请求相关的节点和边是红色的:
-
请求1:用户U748向主机C17693请求访问主机C305。
-
请求2:用户U748请求主机C151访问主机C305。
-
请求3:用户U636向主机C1495请求访问主机C305。
-
请求4:用户U1723向主机C1521请求访问主机C305。
使用图学习,我们可以分析不同用户和主机之间的连接,以及由后续远程访问组成的路径,这些路径显示可到达的主机。图学习的输出是图的每个节点和边的嵌入向量,可用于区分正常和恶意的身份验证请求。
在此Kestrel分析中实现的机器学习方法是受监督的,由三个阶段组成,如下所示。
在第 1 阶段,根据所有可用的身份验证事件构建图形,无论这些事件是恶意的还是正常的,如上一节所述。在第2阶段,我们的Kestrel分析使用节点嵌入技术为每个节点导出128个元素的向量。这些向量和认证请求的时间用于为每个请求生成特征向量。阶段3是最后一个阶段,专门用于训练分类器。目前,Kestrel分析可以应用五种分类器,即SVM、逻辑回归、KNN、XGBoost和随机森林。
再来介绍在 Kestrel 中提供有关身份验证请求的数据
要使用 Kestrel 加载训练和测试数据,我们可以将数据保存到数据库中。例如,我们可以有一个包含五列的表,如图所示,图中存储有关训练和测试身份验证请求信息的数据库表。
存储在数据库中的数据可以使用STIX-shifter数据源接口读取。我们可以使用以下命令以与前面的Kestrel分析相同的方式加载数据以检测横向移动。
users = GET user-account FROM stixshifter://database WHERE [user-account:user_id != null]
connections = FIND network-traffic LINKED users
connections_obs = ADDOBSID (connections)
users_obs = ADDOBSID (users)
observations = GET observed-data FROM stixshifter://database WHERE [user-account:user_id != null]
在应用Kestrel分析之前,您需要使用以下命令下载并构建它。
$ git clone https://github.com/opencybersecurityalliance/kestrel-analytics/Graph Learning-based Lateral Movement Detection.git
$ docker build -t kestrel-analytics-detect_lm
在构建Kestrel分析并读取存储在数据库中的信息之后,我们可以分别对变量观察和表users_obs和connection_obs应用Kestrel分析。此分析采用两个输入参数。第一个叫做walkLength,它表示第二阶段随机游走的最大长度,第二个叫做classifier,它可以设置为svm、knn、logisticRegression、xgboost或randomfrierth。
APPLY docker://detect_lm ON observations, users_obs, connections_obs WITH walkLength=3, classifier=xgboost
我们的Kestrel分析通过向所有实体添加五个属性来修改变量观察,这些属性分别称为“destination”,“source”,“status”和“user_id”。在这些修改之后,每个实体将显示一个身份验证请求,其状态(恶意或良性)由实体的属性状态的值表示。
6.5使用 Kestrel 和 SysFlow 在混合云中狩猎
本章节将探索在混合云中使用Kestrel进行威胁狩猎工作,其中云工作负载由SysFlow监控。本文中的示例是在混合云中追踪APT,这是典型供应链攻击的一种变体,但以更隐蔽的方式实施。
-
Kestrel提供了用于编写、共享和重用狩猎流的语言。
-
STIX-shifter提供联合搜索来实现跨源查询。
-
Elastic为日志和警报管理提供数据存储和检索。
-
SysFlow在容器等云工作负载中提供互联可观察性。
在真正的云中部署开放狩猎堆栈之前,想要了解云中的威胁狩猎吗?按照两步说明尝试mini-stack(需要python3和docker):
1. 从 SysFlow克隆sf-deployments存储库并运行make build并make run启动 (i) 一个要监控的目标容器 (名为Attack),(ii) SysFlow容器(名为sf-collector和sf-processor),以及 (iii) Elastic容器(命名为elk_elasticsearch和elk_kibana)。
2. 安装Kestrel和STIX-shifter Elastic-ECS模块。添加一个STIX-shifter指向elk_elasticsearch上的Elasticsearch服务的数据源容器。
我们基本上只需要Elasticsearch进行数据存储和检索。SysFlow将遥测和警报数据直接流式传输到Elasticsearch,Kestrel将通过STIX-shifter直接从Elasticsearch检索数据。可以选择安装整个ELK:Elasticsearch、Logstash和Kibana。
设置Elasticsearch的选项有以下三个选项:
1.免费试用AWS/Azure/GCP上的云服务。
2.在任何公共/私有云中启动Elasticsearch容器。
3.安装并托管我们自己的专用Elasticsearch服务器。
设置 Elastic后,唯一需要保留的就是Elasticsearch 服务 IP 和端口,假设我们得到elastic.ourcloud.com:9200。
SysFlow为云工作负载提供了连接的可观察性。它使用eBPF (Falco)有效地监控系统调用,并将大量低级系统调用事件提升到FileFlow和NetworkFlow等流,以更好地组织遥测数据。SysFlow还执行实时分析,例如根据Falco规则标记ATT&CK TTP。
如上面的架构图所示,SysFlow有两个主要组件Collector和Processor,它们将作为云中的两个独立容器执行,此外还有所有现有的工作负载容器在云中进行监控。
根据云环境,例如Docker/Helm/OpenShift,我们可以按照详细的SysFlow部署说明在云中启动两个SysFlow容器。为了简化与Elasticsearch的连接,让我们克隆sf-deployment repo并使用其/integrations/elk/sf/目录中的部署脚本。
在启动SysFlow之前,我们需要修改配置告诉SysFlow:
1.要监视哪个工作负载容器?在.env.no_elk文件中设置container.name=container1以监控我们的云中名为container1的容器。此条件指示SysFlow收集器仅传递来自此容器的记录(默认情况下,SysFlow监控整个终端环境)。
2.我们的Elasticsearch服务在哪里?让我们用我们的Elasticsearch服务的凭证替换管道中的172.17.0.1:9200 elastic.ourcloud.com:9200.tee.no_elk.json文件(两个部分)。
3.Elasticsearch中的哪个索引名称用于流式传输遥测和警报?让我们用container1-sysflow-alerts和container1-sysflow-events替换掉管道.tee.no_elk.json文件中的sysflow-alerts和sysflow-events。
接下来让我们启动SysFlow容器并观察流入Elastic的数据:
make -f Makefile.no_elk run
接下来,使用STIX-shifter设置Kestrel
假设我们将在Jupyter Notebook中编写并执行Kestrel狩猎剧本,让我们在我们的狩猎机器(本地笔记本电脑或云中的专用狩猎虚拟机)上安装Kestrel Runtime。我们将按照Kestrel安装指南在Python虚拟环境中安装 Kestrel 以及最新的依赖包。
首先,在狩猎机中打开一个终端,创建一个干净的Python虚拟环境,激活它,并更新虚拟环境:
$ python -m venv huntingspace
$ . huntingspace/bin/activate
$ pip install -U pip setuptools wheel
其次,安装Kestrel Runtime及其 Jupyter Notebook内核:
$ pip install kestrel-jupyter
$ python -m kestrel_jupyter_kernel.setup
第三,安装STIX-shifter的Elastic ECS连接器:
$ pip install stix-shifter-modules-elastic-ecs
第四,定义环境变量来配置Kestrel数据源container1:
$ export STIXSHIFTER_CONTAINER1_CONNECTOR=elastic_ecs
$ export STIXSHIFTER_CONTAINER1_CONNECTION='{"host":"elastic.ourcloud.com", "port":9200, "indices":"container1-sysflow-*"}'
$ export STIXSHIFTER_CONTAINER1_CONFIG='{"auth":{"id":"VuaCfGcBCdbkQm-e5aOx", "api_key":"ui2lp2axTNmsyakw9tvNnw"}}'
我们可以从混合云添加更多数据源。如果新的数据源通过Elastic,例如由Sysmon监控的Windows系统,只需定义一组新的环境变量即可对其进行设置:
$ export STIXSHIFTER_LAPTOP1_CONNECTOR=elastic_ecs
$ export STIXSHIFTER_LAPTOP1_CONNECTION='{"host":"elastic.ourcloud.com", "port":9200, "indices":"employee-laptop-1"}'
$ export STIXSHIFTER_LAPTOP1_CONFIG='{"auth":{"id":"VuaCfGcBCdbkQm-e5aOx", "api_key":"ui2lp2axTNmsyakw9tvNnw"}}'
如果附加数据源是CarbonBlack/CrowdStrike/QRadar,我们需要像第三步中那样安装相应的STIX-shifter连接器,然后再在第四步中设置数据源。
我们的开放式狩猎堆栈已准备就绪!让我们启动 Jupyter Notebook 并进行狩猎。在我们安装和设置 Kestrel Runtime的终端中:
Jupyter 应报告要在浏览器中访问的URL。如果这是云中的专用狩猎服务器,我们可能需要配置Jupyter以允许从特定/任何IP进行访问。
在打开 Jupyter URL 的浏览器中,我们可以通过在New Notebooks下拉菜单下选择Kestrel内核来创建 Kestrel Huntbook。
让我们编写并执行(Shift+ Enter)我们的hello world狩猎步骤:
# get all `ping` processes on Oct 17, 2021 into Kestrel variable `hello`
hello = GET process FROM stixshifter:
WHERE [process:name = 'ping']
START t'2021-10-17T00:00:00Z' STOP t'2021-10-18T00:00:00Z'
数据管道正在运行。然后,让我们在会话中测试第一个狩猎步骤,使用SysFlow TTP标记简化复杂的TTP模式,以在Kestrel中编写:
exploits = GET process FROM stixshifter:
# The TTP tag simplifies enum `process:name IN ('sh', 'bash', 'csh', ...)`
# https:
WHERE [process:parent_ref.name = 'node' AND process:x_ttp_tags LIKE '%T1059%']
START t'2021-10-17T00:00:00Z' STOP t'2021-10-18T00:00:00Z'
exp_spawn = FIND process CREATED BY exploits
exp_act = GET process FROM exp_spawn WHERE [process:name != 'bash']
exp_act = SORT exp_act BY created ASC
DISP exp_act ATTR created, pid, parent_ref.pid, name, command_line
威胁狩猎的第一步是假设或陈述,定义威胁狩猎分析人员对环境中可能存在哪些威胁以及如何找到它们的想法。剩下的步骤是 -- 数据收集和处理、触发、调查和响应。KaaS 项目的目标是采用这些流程,不仅实现成功的狩猎,而且通过团队狩猎服务缩短影响时间。在下图中,NIST 网络安全框架 (CSF)检测功能是我们的目标指标,用于跟踪使用该服务的团队的改进情况。我们想将其与没有协作和持久性的单一威胁狩猎分析人员进行比较。NIST CSF中的检测功能被定义为“制定并实施适当的活动来识别网络安全事件的发生”。我们还将分析KaaS中Kestrel和OpenC2的响应和恢复。
本部分将介绍如何创建自己的Kestrel分析,以及如何与威胁狩猎社区分享它们。Kestrel中的分析是“外部函数接口”的一种形式,可以将它们视为插件:使您能够使用API、开源软件或您自己的逻辑等外部资源丰富网络安全数据的附加功能。
要开始使用Kestrel分析,首先克隆Kestrel分析存储库:
$ git clone https://github.com/opencybersecurityalliance/kestrel-analytics.git
存储库中的每个分析都有一个README.md,解释如何构建和使用它。例如,suspiciousscoring的自述文件显示了如何构建docker镜像:
$ docker build -t kestrel-analytics-susp_scoring
procs = GET process
FROM file://samplestix.json
WHERE [process:parent_ref.name = 'cmd.exe']
APPLY docker://susp_scoring ON procs
在此示例中,第一个语句创建一个名为proc的变量,它从本地STIX 2.0 Bundle中获取与STIX Pattern [process:parent_ref.name ='cmd.exe']匹配的进程类型(STIX Cyber Observable对象类型)的已分配实体。此模式匹配父名称为cmd.exe的进程。接下来,susp_scoring分析将应用于procs变量;它将对procs中的流程实体进行评分,并为每个流程添加x_su**ious_score属性(或STIX术语中的“属性”)。
[process:parent_ref.name = 'cmd.exe']cmd.exesusp_scoringprocsprocessprocsx_suspicious_score
创建您自己的分析很容易,就像Docker化任何其他代码一样。首先,我们提供了一个您可以复制的模板:
$ cp -r template/ analytics/my_analytic
$ cd analytics/my_analytic
$ docker build -t kestrel-analytics-my_analytic .
就是这样!现在Kestrel可以使用它。以下是运行Kestrel Jupyter Notebook 内核的Jupyter Notebook中的实际示例:
该示例返回一个Kestrel显示对象 -- 在本例中,是一小块HTML,上面写着“Hello World!-- Kestrel分析”。它还为输入变量的所有实体添加了一个新属性,如使用Kestrel的DISP命令procs所示:
现在让我们的分析做一些有用的事情,一些在探索性数据分析中经常使用的事情 -- 聚类。由于我们决定在 Python中实现分析,因此我们可以使用scikit-learn库来构建K-means聚类分析。
Kestrel Analytics可以从作为参数传入的任何变量添加/修改实体的属性。它不仅可以返回更新的变量,还可以返回用于构建可视化分析的显示对象。
1.它只需要一个参数或一个Kestrel变量作为输入。
2.它向输入对象添加了一个新属性:K均值算法为每个对象分配的簇编号。
3.它不会产生显示对象 -- 让我们简单地删除显示对象的代码。
分析的开发人员定义分析的需求和使用。一些分析仅适用于特定类型的实体,例如流程实体变量的susp_scoring,而一些分析可能适用于多个实体类型。
K-means需要数字输入,并且针对网络流量实体(网络流量STIX网络可观察类型)的src_byte_count和dst_byte_count等字节计数执行,它比进程实体的进程名称和命令行更有意义。为了简单起见,让我们在新的Kestrel分析中只处理一种实体类型 -- 网络流量。
让我们了解一下将应用分析的数据;我使用GET语句从笔记本电脑上的合成数据集中提取DNS连接:
使用INFO命令,我们可以看到dns_conns中的实体具有哪些属性(从而确认这个STIX包确实包含了字节计数,这是可选的):
现在我们有了一些数值数据,我们准备尝试K-means。我们需要一种方法来告诉K-means要查看哪些属性,以及要找到多少个聚类。我们可以使用Kestrel分析参数。应用命令的完整语法为:
APPLY analytics_identifier ON var1, var2, ... WITH x=1, y=abc
WITH子句允许我们向分析发送参数,因此我们将使用它来选择输入数据的属性并指定簇的数量。
Analytics参数作为环境变量传递给docker容器,因此当我们的容器启动时,我们需要读取这些值。
import os
import pandas as pd
from sklearn.cluster import KMeans
INPUT_DATA_PATH = "/data/input/0.parquet.gz"
OUTPUT_DATA_PATH = "/data/output/0.parquet.gz"
COLS = os.environ['columns']
N = os.environ['n']
def analytics(df):
cols = COLS.split(',')
n = int(N)
kmeans = KMeans(n_clusters=n).fit(df[cols])
df['cluster'] = kmeans.labels_
return df
if __name__ == "__main__":
dfi = pd.read_parquet(INPUT_DATA_PATH)
dfo = analytics(dfi)
dfo.to_parquet(OUTPUT_DATA_PATH, compression="gzip")
常量INPUT_DATA_PATH和OUTPUT_DATA_PATH的值是Kestrel中用于将Kestrel变量传入和传出docker容器的约定。Kestrel Runtime将第一个输入变量的数据(实际上是一个表)写入/data/input/0.parquet.gz。这是一个gzip压缩的Parquet文件。如果您有多个输入变量,请将该路径中的0替换为1、2等。如前所述,我们的聚类分析将只处理一个输入变量。
由于分析结果需要添加属性(或修改现有属性),因此我们以相同的格式写出结果,但不同的路径:/data/output/0.parquet.gz。
如果你以前从未处理过Parquet文件,不要担心;流行的Pandas包使读写Parquet变得轻而易举。我们的分析部分使用Pandas将输入变量读入Pandas DataFrame,调用我们的分析函数,然后写出结果。
COLS = os.environ['columns']
N = os.environ['n']
只需获取环境变量的值即可,它们的名称相同的是我们的WITH子句中的参数。
在我们的分析功能中,我们有一些工作要做。首先,我们的参数只是我们从环境中读取的字符串值。我们希望我们的属性列表(称之为列,因为现在处理的是一个Pandas DataFrame)是一个实际的Python列表,所以我们使用Python字符串方法split将它以逗号分隔为列名列表。然后,我们希望n,我们的簇数,是一个实际的整数,使用int()进行转换。
真正要做的就是将我们的数据传递给KMeans算法本身,我们在这里使用拟合方法。
一旦KMeans完成,每个输入对象的簇号(在例子中,src_byte_count和dst_byte_count对)将存储在其标签成员数组中。这就是我们想要的新属性,因此我们只需将其分配为我们的DataFrame中的新列簇。
我们返回我们修改后的DataFrame,以Parquet文件的形式写回输出路径,我们就大功告成了!我们的容器将退出,Kestrel运行时将看到我们的输出文件,并将我们变量的值替换为该文件中的表。这意味着我们的新列现在将作为Kestrel中的新属性可见。
我们现在需要稍微修改一下示例Dockerfile,因为我们要添加一些第三方Python依赖项。除此之外,我们将修复顶部的评论,但仅此而已:
FROM python:3
RUN pip install
pip install
WORKDIR /opt/analytics
ADD analytics.py .
CMD ["python", "analytics.py"]
在重新构建我们的docker镜像(使用与上面相同的docker build命令)之后,我们就可以尝试我们的分析了。它不再是一个“Hello,World”,现在它实际上在做一些分析 -- k-means聚类。
在jupyter notebook中的Kestrel会话中,我们可以使用参数运行我们的新分析:
APPLY docker://my_analytic ON dns_conns WITH n=2, columns=src_byte_count,dst_byte_count
1393个对象中只有8个被分配到簇1,所以我将这些称为outliers。
就是这样,我们使用scikit-learn和Pandas构建了一个新的Kestrel分析。现在,我们可以像数据科学家一样在Kestrel中对安全数据进行聚类!只需要docker化一小段代码,并遵循一些简单的Kestrel约定,就可以将数据传入和传出我们的容器。
是时候对其进行润色(可能使其适用于多种实体类型),并向Kestrel analytics存储库发出拉取请求(PR),以便与我们的同事,朋友和社区中的其他威胁分析分享。
本文重点从Kestrel威胁狩猎语言的开发初衷、Kestrel的构成、关键语法和用法、核心命令介绍,以及Kestrel在不同场景下的应用示例等方面进行介绍。
Kestrel作为OCA框架中的应用,充分利用了OCA框架中多个开源项目的互联互通互操作特性,实现了环境下的灵活威胁狩猎应用操作。对于构建跨组织、跨平台、跨产品的开展威胁分析等框架和场景,具有较好的参考价值。
【1】What is Kestrel? ,https://kestrel.readthedocs.io/en/stable/overview/index.html
【2】Kestrel Interfaces,https://kestrel.readthedocs.io/en/stable/language/interface.html
【3】Terminology and Concepts,https://kestrel.readthedocs.io/en/stable/language/tac.html
【4】Entity and Variable,https://kestrel.readthedocs.io/en/stable/language/eav.html
【5】Kestrel Command,https://kestrel.readthedocs.io/en/stable/language/commands.html
【6】Kestrel Data Retrieval Explained,https://opencybersecurityalliance.org/kestrel-data-retrieval-explained/
【7】A Kestrel Analytics to Detect Lateral Movement,https://opencybersecurityalliance.org/kestrel-analytics-lateral-movement/
【8】Try Kestrel in a Cloud Sandbox,https://opencybersecurityalliance.org/try-kestrel-in-a-cloud-sandbox/
【9】Detecting Malicious Remote Authentication Requests Using Graph Learning,https://opencybersecurityalliance.org/detecting-malicious-remote-authentication-events-using-graph-learning/
【10】Setting Up The Open Hunting Stack in Hybrid Cloud With Kestrel and SysFlow,https://opencybersecurityalliance.org/kestrel-sysflow-open-hunting-stack/
【11】Announcing the Team Threat Hunting Project,https://opencybersecurityalliance.org/announcing-the-team-threat-hunting-project/
【12】Building Your Own Kestrel Analytics and Sharing With the Community,https://opencybersecurityalliance.org/kestrel-custom-analytics/
最后,小小的吸粉一波,关注公众号并回复“Kestrel”,获取持续收集的Kestrel相关内容的材料。
评论