本文共 3273 字,大约阅读时间需要 10 分钟。
目前公司做的是工业互联网相关的产品,从emq网关接收到数据后,emq直接将数据发送到后面的服务进行处理,这里需要进行解耦,肯定需要mq消息队列进行缓冲,在这里,公司采用kafka进行解耦。
两个原因: 第一,kafka的数据不丢失,以log的形式保存到磁盘中。在一实际生产测试中发现生产数据持续抛6天,磁盘增长23G,平均每天增长3.8G左右。 第二,kafka当中有partion和consumer组的概念,支持并行消费处理 下面就简单写个demo记录下,利用springboot 使用kafka的方式只需要引入
org.springframework.kafka spring-kafka
完整pom文件如下
4.0.0 org.springframework.boot spring-boot-starter-parent 2.4.1 com.cch kafkademo 0.0.1-SNAPSHOT kafkademo Demo project for Spring Boot 1.8 org.springframework.boot spring-boot-starter org.projectlombok lombok true org.springframework.boot spring-boot-starter-test test org.springframework.kafka spring-kafka org.springframework.boot spring-boot-maven-plugin org.projectlombok lombok
missing-topics-fatal: false #自动创建topic 这个配置非常实用
spring: kafka: producer: retries: 0 acks: 1 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: 11.8.7.248:9092 consumer: enable-auto-commit: true auto-commit-interval: 1000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: latest max-poll-records: 100 listener: missing-topics-fatal: false #自动创建topiccch: kafka: consumer: id: kafkademo-group-id #消费者Id topic: xxx #消费的topic
@Datapublic class ReportDTO { private String id;}
@Component@Slf4jpublic class Consumer { @KafkaListener(id = "${cch.kafka.consumer.id}", topics = { "${cch.kafka.consumer.topic}"}) public void listen(ConsumerRecord record) { log.info("Enter to get msg from kafka"); if (record.value() != null) { try { String jsonStr = JSONUtil.toJsonStr(record.value()); ReportDTO reportDto = JSONUtil.toBean(jsonStr, ReportDTO.class); log.info("收到kafka的消息为: " + reportDto); } catch (Exception e) { log.error("Transfer kafka value to ReportDTO failed, ex:{}", e.getMessage()); } } log.info("Leave to get msg from kafka"); }}
转载地址:http://lkrii.baihongyu.com/