- 导包 -除了Spring Boot 之外还需要额外导入 Spring Web、Kafka
2, 编写配置文件
spring.kafka.bootstrap-servers=47.106.86.64:9092
#序列化器 以及反序列化器
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.group-id=test2
3、定义简单生产者
@RestController
public class ProducerController {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/hi")
public String data(String msg){
kafkaTemplate.send("first",msg);
return "ok";
}
}
编写具有回调函数的生产者
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("first", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("first", callbackMessage).addCallback(
(ListenableFutureCallback<? super SendResult<String, String>>) new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
4、定义消费者 →启动监听线程
@Configuration
public class KafkaConfiguration{
@KafkaListener(topics = {"first"})
public void message1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("点对点消费1:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
参考 :https://blog.csdn.net/prague6695/article/details/123869202
https://mp.weixin.qq.com/s/ZG9e6-81cXhDSK4p05gR3A
https://blog.csdn.net/weixin_55229531/article/details/125135400
ate;
@RequestMapping("/hi")
public String data(String msg){
kafkaTemplate.send("first",msg);
return "ok";
}
}
编写具有回调函数的生产者
```java
@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("first", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("first", callbackMessage).addCallback(
(ListenableFutureCallback<? super SendResult<String, String>>) new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
4、定义消费者 →启动监听线程
@Configuration
public class KafkaConfiguration{
@KafkaListener(topics = {"first"})
public void message1(ConsumerRecord<?, ?> record){
// 消费的哪个topic、partition的消息,打印出消息内容
System.out.println("点对点消费1:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}