<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka --><dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.10.RELEASE</version></dependency>
添加配置:
spring: kafka: bootstrap-servers: 172.16.3.29:9092 producer: # 发生缺点后,重发的次数。 retries: 0 # #当有多个须要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以利用的内存大小,按照字节数打算。 # batch-size: 16384 # # 设置生产者内存缓冲区的大小。 # buffer-memory: 33554432 # # 键的序列化办法 key-serializer: org.apache.kafka.common.serialization.StringSerializer # # 值的序列化办法 value-serializer: org.apache.kafka.common.serialization.StringSerializer # # acks=0 : 生产者在成功写入之前不会等待任何来自做事器的相应。 # # acks=1 : 只要集群的首领节点收到,生产者就会收到一个来自做事器成功相应。 # # acks=all :只有当所有参与复制的节点全部收到时,生产者才会收到一个来自做事器的成功相应。 # acks: 1 consumer: group-id: test # # 自动提交的韶光间隔 在spring boot 2.X 版本中这里采取的是值的类型为Duration 须要符合特定的格式,如1S,1M,2H,5D # auto-commit-interval: 1S # # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情形下该作何处理: # # latest(默认值)在偏移量无效的情形下,消费者将从最新的记录开始读取数据(在消费者启动之后天生的记录) # # earliest :在偏移量无效的情形下,消费者将从起始位置读取分区的记录 # auto-offset-reset: earliest # # 是否自动提交偏移量,默认值是true,为了避免涌现重复数据和数据丢失,可以把它设置为false,然夹帐动提交偏移量 # enable-auto-commit: false # # 键的反序列化办法 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # # 值的反序列化办法 value-deserializer: org.apache.kafka.common.serialization.StringDeserializer max-poll-records: 150 listener: # # 在侦听器容器中运行的线程数。 # concurrency: 5 # #listner卖力ack,每调用一次,就立即commit # ack-mode: manual_immediate missing-topics-fatal: false
二、测试做事改造2.1 生产者
供应一个大略的生产者工具类,只有大略的发送一个方法,参数是 topic 和 message。import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.stereotype.Component;/ kafka生产者 @author weirx @date 2021/02/03 14:22 /@Componentpublic class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; / kafka发送 @param @author weirx @return void @date: 2021/2/3 / public void send(String topic,String message){ kafkaTemplate.send(topic,message); }}
2.2 下单接口改造
引入KafkaProducer,下单时更换http要求成kafka推送,伪代码如下:
@Autowiredprivate KafkaProducer kafkaProducer;kafkaProducer.send("rob-necessities-order",JSONObject.toJSONString(map));
2.3 支付接口改造
在前面一篇文章当中,支付接口是作为下单接口的回调接口被调用的,实在是一个完备同步的接口,易涌现问题,如壅塞,要求失落败、超时等。以是此处我们也将其改造成kafka异步消费,消费者工具类如下所示:

@Slf4j@Componentpublic class KafkaConsumer { @Autowired private TradingServiceImpl tradingService; @KafkaListener(topics = {"rob-necessities-trading"}) public void consumer(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); String orderId = message.toString(); log.info("支付开始韶光:{},订单id: {}", LocalDateTime.now(), orderId); tradingService.pay(Long.valueOf(orderId)); log.info("支付完成韶光/////////////////////////:{},订单id: {}", LocalDateTime.now(), orderId); } }}
三、订单做事改造2.1 支付回调改造
前面的支付办法,我们是在订单完成后通过http接口形式,现在改用kafka,以是我们须要供应一个kafka生产者,将同送到测试做事:
@Componentpublic class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; / kafka发送 @param @author weirx @return void @date: 2021/2/3 / public void send(String topic,String message){ kafkaTemplate.send(topic,message); }}
2.2 下单接口改造
接口下单命令的办法不再是等待http要求调用了,此处变成监听kafka,供应消费者如下:
@Slf4j@Componentpublic class KafkaConsumer { @Autowired private OrderService orderService; @KafkaListener(topics = {"rob-necessities-order"}) public void consumer(ConsumerRecord<?, ?> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { Object message = kafkaMessage.get(); log.info("----------------- record =" + record); log.info("------------------ message =" + message); JSONObject jsonObject = JSONObject.parseObject(message.toString()); OrderDTO orderDTO = jsonObject.toJavaObject(OrderDTO.class); orderService.saveOrder(orderDTO); } }}
由于我们已经利用了kafka作为并发时流量缓冲的组件,就不再须要我们前面自己添加进来的行列步队了,以是改造后的下单接口如下所示:
@Autowiredprivate KafkaProducer kafkaProducer;@Overridepublic Result saveOrder(OrderDTO orderDTO) { // 下单实现 Result result = this.saveOrderImpl(orderDTO); String orderId = JSONObject.parseObject(JSONObject.toJSONString(result.getData())).getString("id"); kafkaProducer.send("rob-necessities-trading", orderId); return Result.success("下单成功");}
如上所示,详细的订单业务逻辑没有变革。
四、测试kafka是单节点的,在其他做事器上,由于我本地没有内存了。全部完成韶光大概是23秒,韶光上有些许增加,但是整体吞吐量跟以前绝对不是一个量级的了。其余,我们在调用支付接口的时候也可以通过kafka的形式,但是本文不做修正了。
作者:我犟不过你链接:https://juejin.cn/post/7068450775743070244