SpringBoot - Kafka的集成与使用详解3(生产者1:指定topic、partition、key等)
三、生产者1:指定 topic、partition、key 等
1,send() 方法
(1)在之前的文章中我们都是通过 KafkaTemplate 的 send() 方法指定一个 topic 发送消息,其实 send() 方法还支持其他参数,具体如下:
参数说明:
- topic:这里填写的是 Topic 的名字
- partition:这里填写的是分区的 id,其实也是就第几个分区,id 从 0 开始。表示指定发送到该分区中
- timestamp:时间戳,一般默认当前时间戳
- key:消息的键
- data:消息的数据
- ProducerRecord:消息对应的封装类,包含上述字段
- Message<?>:Spring 自带的 Message 封装类,包含消息及消息头
ListenableFuture<SendResult<K, V>> send(String topic, V data); ListenableFuture<SendResult<K, V>> send(String topic, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data); ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record); ListenableFuture<SendResult<K, V>> send(Message<?> message);
(2)下面是一些简单的使用样例:
//发送带有时间戳的消息 kafkaTemplate.send("topic.hangge.demo", 0, System.currentTimeMillis(), "key1", "message"); //使用ProducerRecord发送消息 ProducerRecord record = new ProducerRecord("topic.hangge.demo", "message"); kafkaTemplate.send(record); //使用Message发送消息 Map map = new HashMap(); map.put(KafkaHeaders.TOPIC, "topic.hangge.demo"); map.put(KafkaHeaders.PARTITION_ID, 0); map.put(KafkaHeaders.MESSAGE_KEY, 0); GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map)); kafkaTemplate.send(message);
2,sendDefault() 方法
(1)sendDefault() 方法和 send() 方法类似,只不过它不需要传入 topic(直接使用默认 topic),该方法支持如下几种形式:
参数说明:
- partition:这里填写的是分区的 id,其实也是就第几个分区,id 从 0 开始。表示指定发送到该分区中
- timestamp:时间戳,一般默认当前时间戳
- key:消息的键
- data:消息的数据
ListenableFuture<SendResult<K, V>> sendDefault(V data); ListenableFuture<SendResult<K, V>> sendDefault(K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data); ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
(2)要使用 sendDefault 发送消息,首先我们需要创建一个配置类并编写一个带有默认 Topic 参数(高亮部分)的 KafkaTemplate:
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String servers; @Value("${spring.kafka.producer.retries}") private int retries; @Value("${spring.kafka.producer.acks}") private String acks; @Value("${spring.kafka.producer.batch-size}") private int batchSize; @Value("${spring.kafka.producer.properties.linger.ms}") private int linger; @Value("${spring.kafka.producer.buffer-memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.ACKS_CONFIG, acks); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, Object> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, Object> kafkaTemplate() { KafkaTemplate template = new KafkaTemplate<String, Object>(producerFactory()); template.setDefaultTopic("topic.hangge.default"); // 设置默认的 topic return template; } }
@RestController public class KafkaProducer { @Autowired private KafkaTemplate<String, Object> kafkaTemplate; // 发送消息 @GetMapping("/test") public void test() { kafkaTemplate.sendDefault("send default test"); } }