博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot kafka
阅读量:4100 次
发布时间:2019-05-25

本文共 3273 字,大约阅读时间需要 10 分钟。

文章目录

前言

目前公司做的是工业互联网相关的产品,从emq网关接收到数据后,emq直接将数据发送到后面的服务进行处理,这里需要进行解耦,肯定需要mq消息队列进行缓冲,在这里,公司采用kafka进行解耦。

两个原因:
第一,kafka的数据不丢失,以log的形式保存到磁盘中。在一实际生产测试中发现生产数据持续抛6天,磁盘增长23G,平均每天增长3.8G左右。
第二,kafka当中有partion和consumer组的概念,支持并行消费处理
下面就简单写个demo记录下,利用springboot 使用kafka的方式

一、工程目录结构

在这里插入图片描述

二、pom文件

只需要引入

org.springframework.kafka
spring-kafka

完整pom文件如下

4.0.0
org.springframework.boot
spring-boot-starter-parent
2.4.1
com.cch
kafkademo
0.0.1-SNAPSHOT
kafkademo
Demo project for Spring Boot
1.8
org.springframework.boot
spring-boot-starter
org.projectlombok
lombok
true
org.springframework.boot
spring-boot-starter-test
test
org.springframework.kafka
spring-kafka
org.springframework.boot
spring-boot-maven-plugin
org.projectlombok
lombok

二、application.yml

missing-topics-fatal: false #自动创建topic 这个配置非常实用

spring:  kafka:    producer:      retries: 0      acks: 1      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    bootstrap-servers: 11.8.7.248:9092    consumer:      enable-auto-commit: true      auto-commit-interval: 1000      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer      auto-offset-reset: latest      max-poll-records: 100    listener:      missing-topics-fatal: false  #自动创建topiccch:  kafka:    consumer:      id: kafkademo-group-id  #消费者Id      topic: xxx  #消费的topic

三、reportDTO

@Datapublic class ReportDTO {
private String id;}

四、consumer

@Component@Slf4jpublic class Consumer {
@KafkaListener(id = "${cch.kafka.consumer.id}", topics = {
"${cch.kafka.consumer.topic}"}) public void listen(ConsumerRecord
record) {
log.info("Enter to get msg from kafka"); if (record.value() != null) {
try {
String jsonStr = JSONUtil.toJsonStr(record.value()); ReportDTO reportDto = JSONUtil.toBean(jsonStr, ReportDTO.class); log.info("收到kafka的消息为: " + reportDto); } catch (Exception e) {
log.error("Transfer kafka value to ReportDTO failed, ex:{}", e.getMessage()); } } log.info("Leave to get msg from kafka"); }}

转载地址:http://lkrii.baihongyu.com/

你可能感兴趣的文章
剑指offer算法题分析与整理(三)
查看>>
JVM并发机制探讨—内存模型、内存可见性和指令重排序
查看>>
WPF中PATH使用AI导出SVG的方法
查看>>
java LinkedList与ArrayList迭代器遍历和for遍历对比
查看>>
如何用好碎片化时间,让思维更有效率?
查看>>
带WiringPi库的交叉笔译如何处理二之软链接概念
查看>>
Java8 HashMap集合解析
查看>>
自定义 select 下拉框 多选插件
查看>>
fastcgi_param 详解
查看>>
poj 1976 A Mini Locomotive (dp 二维01背包)
查看>>
剑指_复杂链表的复制
查看>>
MODULE_DEVICE_TABLE的理解
查看>>
No devices detected. Fatal server error: no screens found
查看>>
db db2_monitorTool IBM Rational Performace Tester
查看>>
postgresql监控工具pgstatspack的安装及使用
查看>>
【JAVA数据结构】双向链表
查看>>
【JAVA数据结构】先进先出队列
查看>>
谈谈加密和混淆吧[转]
查看>>
乘法逆元
查看>>
Objective-C 基础入门(一)
查看>>