Kafka 消费者
Kafka 消费者(Consumer)负责从 Kafka 的主题(Topic)中读取消息。消费者是 Kafka 数据流的接收端,能够根据一定的配置和策略来获取数据并进行处理。
1. Kafka 消费者的基本原理
Kafka 消费者有以下主要特性:
- 消费组(Consumer Group): 消费者可以通过加入消费组来实现消息的分发,确保消息的高效消费和负载均衡。
- 偏移量(Offset): Kafka 消费者在每个分区中都有一个偏移量,指示消息的消费进度。偏移量由 Kafka 维护,也可以由消费者手动控制。
- 消息顺序: Kafka 保证每个分区内的消息顺序,但不同分区的消息不保证顺序。
- 重复消费与幂等性: 消费者可以通过配置和设计来避免消息的重复消费。
2. Kafka 消费者配置
Kafka 消费者的配置决定了其行为,最常用的配置如下:
| Kafka 集群地址,用于初始化消费者客户端。 |
|
| 消费者所属的消费组 ID,Kafka 根据消费组来协调消息的消费。 |
|
| 消息键的反序列化类。 |
|
| 消息值的反序列化类。 |
|
| 消费者如何处理偏移量,如果没有偏移量或偏移量无效时的处理方式。 |
|
| 是否自动提交偏移量。 |
|
| 自动提交偏移量的时间间隔。 |
|
| 消费者与 Kafka 集群之间的会话超时。 |
|
| 每次拉取的最大消息数量。 |
|
3. Kafka 消费者代码实现
基础示例:简单消费者
这是一个简单的 Kafka 消费者示例,它会从 test-topic 主题中消费消息并输出:
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class SimpleKafkaConsumer {
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka 集群地址
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组 ID
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息键反序列化类
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消息值反序列化类
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 如果没有偏移量,自动从最早消息开始消费
// 创建消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));
// 拉取消息并处理
try {
while (true) {
consumer.poll(100).forEach(record -> {
System.out.printf("Consumed record: Key=%s, Value=%s, Offset=%d%n", record.key(), record.value(), record.offset());
});
}
} finally {
consumer.close();
}
}
}
带手动偏移量管理的消费者
手动提交偏移量,不依赖 Kafka 自动提交。这可以通过 enable.auto.commit = false 来实现,并使用 commitSync() 或 commitAsync() 方法手动提交偏移量。
import org.apache.kafka.clients.consumer.*;
import java.util.Collections;
import java.util.Properties;
public class ManualOffsetConsumer {
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "manual-offset-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 禁用自动提交
// 创建消费者实例
Consumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed record: Key=%s, Value=%s, Offset=%d%n", record.key(), record.value(), record.offset());
}
// 手动提交偏移量
consumer.commitSync(); // 确保提交
}
} finally {
consumer.close();
}
}
}
4. Kafka 消费者的高级特性
消费者组与负载均衡
Kafka 消费者通过 消费组(Consumer Group) 实现消息的并行处理。当多个消费者属于同一个消费组时,Kafka 会将消息分配给消费组中的每个消费者处理,每个消费者处理不同分区的数据。消费者组中的每个消费者只会消费自己分配到的分区。
- 每个分区只能被同一消费组内的一个消费者消费。
- 如果消费组中的消费者数量超过分区数量,部分消费者将空闲。
- 如果消费者故障,Kafka 会将分区重新分配给其他消费者。
动态重新平衡
Kafka 会动态地重新平衡消费者组中的消费者和分区分配。消费者可以监听消费组的变化,并及时响应。
consumer.subscribe(Collections.singletonList("test-topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 当分区被撤销时执行
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 当分区被分配时执行
}
});
5. Kafka 消费者的性能优化
配置优化
fetch.min.bytes: 最小拉取消息的字节数。设置该参数可以提高性能,但会增加延迟。fetch.max.wait.ms: 拉取请求的最大等待时间。合理配置可以减少请求的数量,提升吞吐量。max.poll.records: 每次拉取的最大记录数。适当设置可以避免消费者被过多消息阻塞。
批量处理
通过批量拉取消息并批量处理,可以减少消费的延迟并提高吞吐量。
consumer.poll(100).forEach(record -> {
// 批量处理逻辑
});
消费者并发
可以通过启动多个消费者实例来并行消费消息,增加消息处理的吞吐量。
6. 消费者常见问题
- 重复消费: 如果
enable.auto.commit = false或出现消费者崩溃并重新启动,可能会导致消息重复消费。可以通过存储偏移量来避免重复消费。 - 偏移量丢失: 如果消费者的偏移量丢失或无法找到,可能会导致消息丢失。可以通过设置合理的
auto.offset.reset参数来控制这种情况。
详细参考:https://blog.csdn.net/2401_83413238/article/details/145354804?spm=1001.2014.3001.5502
Kafka的一些碎碎念,哈哈哈哈哈