1、Kafka集群整体结构
简单介绍图中关系:
(1) Topic只是一个逻辑概念,Producer和Consumer通过Topic进行业务沟通。
(2) Topic并不存储数据,Topic下的数据分为多组Partition,尽量平均分散到各个Broker上。每组Partition包含Topic下一部分的消息。每组Partition包含一个Leader Partition以及若干个Follower Partition进行备份,每组Partition的个数称为备份因子 replica factor。
(3) Producer将消息发送到对应的Partition上,Consumer通过Partition上的Offset偏移量,记录自己所属消费者组Group在当前Partition上消费消息的进度。
(4) Producer发送给一个Topic的消息,将由Kafka推送给所有订阅了这个Topic的消费者组进行处理。但是在每个消费者组内部,只会有一个消费者实例处理这一条消息。
(5) Kafka的Broker通过Zookeeper组成集群。从这些Broker中,选举产生一个担任Controller角色的Broker。这个Controller的主要任务就是负责Topic的分配以及后续管理工作。在实验的集群中,这个Controller实际上是通过ZooKeeper产生的。
2、基础客户端API实战
Kafka提供了两套客户端API,HighLevel API和LowLevel API。
HighLevel API封装了Kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。
LowLevel API则需要客户端自己管理Kafka的运行细节,Partition、Offset这些数据需要由客户端自行管理。这层API功能更灵活,但使用起来比较复杂,也容易出错。只在极少数对性能要求非常极致的场景才会去使用。
使用Kafka客户端API,只需要引入一个Maven依赖即可:
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.8.0</version></dependency>
2.1、消息发送者流程
可以使用Kafka提供的Producer类,快速发送消息。
构建Producer可分为三个步骤:
(1) 设置Producer核心属性
Producer的可选属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发送到只此Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
(2) 构建消息
Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来做Partition分区的,业务上更关心的是value。
(3) 使用Producer发送消息
通常用到的就是单向发送、同步发送和异步发送三种发送方式。
importorg.apache.kafka.clients.producer.*;importorg.apache.kafka.common.serialization.StringSerializer;importjava.util.Properties;importjava.util.concurrent.CountDownLatch;importjava.util.concurrent.ExecutionException;publicclassMyProducer {privatestaticfinalStringBOOTSTRAP_SERVERS="kf1:9092,kf2:9092,kf3:9092";privatestaticfinalStringTOPIC="disTopic";publicstaticvoidmain(String[] args) throwsExecutionException, InterruptedException {// 1、设置发送者相关属性Propertiesprops=newProperties();// 此处配置的是kafkaprops.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 用于控制异步发送消息,个数与for循环的个数相同// CountDownLatch latch = new CountDownLatch(5);Producer<String, String>producer=newKafkaProducer<>(props);for (inti=0; i<5; i++) {// 2、构建消息ProducerRecord<String, String>record=newProducerRecord<String, String>(TOPIC, Integer.toString(i), "MyProducer"+i);// 3、发送消息// 单向发送:不关心服务端的应答producer.send(record);System.out.println("Sent record: "+i );// 同步发送:获取服务端应答消息前,会阻塞当前线程// RecordMetadata recordMetadata = producer.send(record).get();// String topic = recordMetadata.topic();// int partition = recordMetadata.partition();// long offset = recordMetadata.offset();// String message = recordMetadata.toString();// System.out.println("message: [ " + message + " ] sent with topic: " + topic + ", partition: " + partition + ", offset: " + offset);// 异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数// producer.send(record, new Callback() {//// @Override// public void onCompletion(RecordMetadata recordMetadata, Exception e) {// if (e != null) {// System.out.println("消息发送失败,"+ e.getMessage());// e.printStackTrace();// }else {// String topic = recordMetadata.topic();// long offset = recordMetadata.offset();// int partition = recordMetadata.partition();// String message = recordMetadata.toString();// System.out.println("message: [ " + message + " ] sent with topic: " + topic + ", partition: " + partition + ", offset: " + offset);// }// latch.countDown();// }// }); }// // 消息处理完才停止发送者// latch.await();producer.close(); }}
以单向发送消息为示例:
2.2、消息消费者主流程
使用Kafka提供的Consumer类,快速消费消息。
Consumer同样是可分为三个步骤:
(1) 设置Consumer核心属性
可选的属性都由ConsumerConfig类管理。在这个 类中,同样对于大部分比较重要的属性,都配置了对应 的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
(2) 拉取消息
Kafka采用Consumer主动拉取消息的Pull模式。Consumer主动从Broker上拉取一批感兴趣的消息。
(3) 处理消息,提交位点
消费者将消息拉取完成后,不可以交由业务自行处理对应的这一批消息。只是消费者需要向Broker提交偏移量Offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复推送。
Kafka的客户端基本就是固定的按照这三个大的步骤运行。在具体使用过程中,最大的变数基本上就是给生产者和消费者的设定合适属性。这些属性设置会影响客户端程序的执行方式。
importorg.apache.kafka.clients.consumer.*;importorg.apache.kafka.common.serialization.StringDeserializer;importjava.time.Duration;importjava.util.Arrays;importjava.util.Properties;publicclassMyConsumer {privatestaticfinalStringBOOTSTRAP_SERVERS="kf1:9092,kf2:9092,kf3:9092";privatestaticfinalStringTOPIC="disTopic";privatestaticfinalStringGROUP_ID="testGroup";publicstaticvoidmain(String[] args) {// 1、设置发送者相关属性Propertiesprops=newProperties();// kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);// key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());Consumer<String,String>consumer=newKafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while(true) {// 2、拉取消息//100毫秒超时时间ConsumerRecords<String,String>records=consumer.poll(Duration.ofNanos(100));// 3、处理消息for (ConsumerRecord<String,String>record : records) {System.out.println("offset: "+record.offset() +", key: "+record.key() +", value: "+record.value()); }// 提交offset,消息就不会重复推送consumer.commitSync(); // 同步提交,表示必须等到offset提交完毕,再去消费下一批数据。// consumer.commitAsync(); // 异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。 } }}
消息消费示例:
Kafka单机服务搭建,请参考《为什么要用Kafka?单机服务搭建&三步消息交互》
Kafka集群服务搭建,请参考《Kafka-为什么要用集群?基于ZK搭建Kafka集群》
3、我的公众号&资料获取
敬请关注我的公众号:大象只为你,持续更新技术知识......
相关资料获取:
如需SpringKafka项目Demo,请后台回复:【springkafka】。
我的代码是基于SpringBoot3.x上写的,IDEA 2024版
原文始发于微信公众号(大象只为你):Kafka-一图读懂集群整体结构&基础客户端API实战
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论