自 Spring Boot 2.3 起,Kafka 指标名称已更改。
kafka_consumer_records_consumed_total_records_total -> kafka_consumer_fetch_manager_records_consumed_total
kafka_consumer_records_lag_records -> kafka_consumer_fetch_manager_records_lag
kafka_consumer_records_lag_max_records -> kafka_consumer_fetch_manager_records_lag_max
kafka_consumer_fetch_latency_max_seconds -> kafka_consumer_fetch_manager_fetch_latency_max
kafka_consumer_bytes_consumed_total_bytes_total -> kafka_consumer_fetch_manager_bytes_consumed_total
kafka_consumer_records_per_request_avg_records -> kafka_consumer_fetch_manager_records_per_request_avg
kafka_consumer_heartbeat_rate_heartbeats -> kafka_consumer_coordinator_heartbeat_rate
如果您有自定义消费者工厂,请添加 customizers.orderedStream().forEach(customizer -> customizer.customize(consumerFactory));
@Bean
public ConsumerFactory<Integer, String> customConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
DefaultKafkaConsumerFactory<Integer, String> consumerFactory = new DefaultKafkaConsumerFactory<>(customConsumerConfigs());
customizers.orderedStream().forEach(customizer -> customizer.customize(consumerFactory));
return consumerFactory;
}