Kafka 作为一个分布式发布订阅的消息系统,是目前最流行的消息队列之一,批量消费在现实业务场景中可以提高 kafka 消费吞吐量。Spring 框架可以使用 @KafkaListener
注解来实现消费端批量消费的功能。
Spring for Apache Kafka
以下 Demo 基于 SpringBoot 2.1.1,spring-kafka-2.3
引入依赖
1 2 3 4 5
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.0.RELEASE</version> < /dependency>
|
配置 yml 文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: kafka: bootstrap-servers: 192.168.10.1:9093,192.168.10.2:9093 consumer: enable-auto-commit: false max-poll-records: 500 auto-offset-reset: latest group-id: group-dev listener: concurrency: 1
kafka: leaface: topic: test-topic
|
Kafka 配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| package com.leaface.test.kafka.config;
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties;
import java.util.HashMap; import java.util.Map;
@Configuration @EnableKafka @Slf4j public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}") private String servers;
@Value("${spring.kafka.consumer.max-poll-records}") private int maxPollRecords;
@Value("${spring.kafka.listener.concurrency}") private int concurrency;
@Value("${spring.kafka.consumer.enable-auto-commit}") private boolean enableAutoCommit;
@Value("${spring.kafka.consumer.auto-offset-reset}") private String autoOffsetReset;
@Value("${spring.kafka.consumer.group-id}") private String groupId;
@Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.setBatchListener(true); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; }
@Bean public ConsumerFactory<Integer, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); return props; }
}
|
Consumer 端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
|
public void onMessage(@Payload ConsumerRecord<String, String> record) { try { } catch (Exception e) { log.error("process msg error!", e); } }
@KafkaListener(topics = "#{'${kafka.leaface.topic}'}") public void onMessage(@Payload List<ConsumerRecord<String, String>> recordList, Acknowledgment ack) { processMsg(recordList, ack); }
private void processMsg(List<ConsumerRecord<String, String>> records, Acknowledgment ack) { try { for (ConsumerRecord<String, String> record : records) { } } catch (Exception e) { log.error("process msg error!", e); } finally { ack.acknowledge(); } }
|