当前位置: > > > SpringBoot - Kafka的集成与使用详解3(生产者1:指定topic、partition、key等)

SpringBoot - Kafka的集成与使用详解3(生产者1:指定topic、partition、key等)

三、生产者1:指定 topic、partition、key 等

1,send() 方法

(1)在之前的文章中我们都是通过 KafkaTemplate send() 方法指定一个 topic 发送消息,其实 send() 方法还支持其他参数,具体如下:
参数说明:
  • topic:这里填写的是 Topic 的名字
  • partition:这里填写的是分区的 id,其实也是就第几个分区,id 0 开始。表示指定发送到该分区中
  • timestamp:时间戳,一般默认当前时间戳
  • key:消息的键
  • data:消息的数据
  • ProducerRecord:消息对应的封装类,包含上述字段
  • Message<?>Spring 自带的 Message 封装类,包含消息及消息头
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);

(2)下面是一些简单的使用样例:
//发送带有时间戳的消息
kafkaTemplate.send("topic.hangge.demo", 0, System.currentTimeMillis(), "key1", "message");

//使用ProducerRecord发送消息
ProducerRecord record = new ProducerRecord("topic.hangge.demo", "message");
kafkaTemplate.send(record);

//使用Message发送消息
Map map = new HashMap();
map.put(KafkaHeaders.TOPIC, "topic.hangge.demo");
map.put(KafkaHeaders.PARTITION_ID, 0);
map.put(KafkaHeaders.MESSAGE_KEY, 0);
GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
kafkaTemplate.send(message);

2,sendDefault() 方法

(1)sendDefault() 方法和 send() 方法类似,只不过它不需要传入 topic(直接使用默认 topic),该方法支持如下几种形式:
参数说明:
  • partition:这里填写的是分区的 id,其实也是就第几个分区,id 0 开始。表示指定发送到该分区中
  • timestamp:时间戳,一般默认当前时间戳
  • key:消息的键
  • data:消息的数据
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

(2)要使用 sendDefault 发送消息,首先我们需要创建一个配置类并编写一个带有默认 Topic 参数(高亮部分)的 KafkaTemplate
@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    @Value("${spring.kafka.producer.retries}")
    private int retries;
    @Value("${spring.kafka.producer.acks}")
    private String acks;
    @Value("${spring.kafka.producer.batch-size}")
    private int batchSize;
    @Value("${spring.kafka.producer.properties.linger.ms}")
    private int linger;
    @Value("${spring.kafka.producer.buffer-memory}")
    private int bufferMemory;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.ACKS_CONFIG, acks);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<String, Object>(producerFactory());
        template.setDefaultTopic("topic.hangge.default"); // 设置默认的 topic
        return template;
    }
}
 
(3)然后我们调用 sendDefault 方法发送数据,虽然没有指定 topic,但 kafkaTemplate 会自动把消息发送到默认的 Topic 中:
@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 发送消息
    @GetMapping("/test")
    public void test() {
        kafkaTemplate.sendDefault("send default test");
    }
}
评论0