1. 导包 -除了Spring Boot 之外还需要额外导入 Spring Web、Kafka

image-20220903182633431

2, 编写配置文件

spring.kafka.bootstrap-servers=47.106.86.64:9092
 
#序列化器 以及反序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 
spring.kafka.consumer.group-id=test2

3、定义简单生产者

@RestController
public class ProducerController {
    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;
 
    @RequestMapping("/hi")
    public String  data(String msg){
        kafkaTemplate.send("first",msg);
        return "ok";
    }
}

编写具有回调函数的生产者

@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("first", callbackMessage).addCallback(success -> {
        // 消息发送到的topic
        String topic = success.getRecordMetadata().topic();
        // 消息发送到的分区
        int partition = success.getRecordMetadata().partition();
        // 消息在分区内的offset
        long offset = success.getRecordMetadata().offset();
        System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
    }, failure -> {
        System.out.println("发送消息失败:" + failure.getMessage());
    });
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("first", callbackMessage).addCallback(
        (ListenableFutureCallback<? super SendResult<String, String>>) new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:"+ex.getMessage());
            }
 
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                                   + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
}

4、定义消费者 启动监听线程

@Configuration
public class KafkaConfiguration{
 
 
    @KafkaListener(topics = {"first"})
    public void message1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("点对点消费1:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
 
}

参考 :https://blog.csdn.net/prague6695/article/details/123869202

https://mp.weixin.qq.com/s/ZG9e6-81cXhDSK4p05gR3A

https://blog.csdn.net/weixin_55229531/article/details/125135400
ate;

@RequestMapping("/hi")
public String  data(String msg){
    kafkaTemplate.send("first",msg);
    return "ok";
}

}


编写具有回调函数的生产者

```java
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("first", callbackMessage).addCallback(success -> {
        // 消息发送到的topic
        String topic = success.getRecordMetadata().topic();
        // 消息发送到的分区
        int partition = success.getRecordMetadata().partition();
        // 消息在分区内的offset
        long offset = success.getRecordMetadata().offset();
        System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
    }, failure -> {
        System.out.println("发送消息失败:" + failure.getMessage());
    });
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("first", callbackMessage).addCallback(
        (ListenableFutureCallback<? super SendResult<String, String>>) new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息失败:"+ex.getMessage());
            }
 
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                                   + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });
}

4、定义消费者 启动监听线程

@Configuration
public class KafkaConfiguration{
 
 
    @KafkaListener(topics = {"first"})
    public void message1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("点对点消费1:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
 
}