Springboot集成Kafka,看这篇就够了。Kafka消息事务处理,同步异步分区拦截。

news/2024/5/9 22:27:28

一、基础配置引入

1. 依赖引入(二选一)

- pom.xml

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId>
</dependency>

- gradle

implementation("org.springframework.kafka:spring-kafka")

新建一个Spring Boot项目,在项目中引入如上依赖,默认情况下不填版本号(使用springboot父项目确定版本号)

2. yml配置文件

spring:kafka:bootstrap-servers: 223.122.137.55:9092producer: # 生产者retries: 3  #发送失败重试次数acks: all  #所有分区副本确认后,才算消息发送成功# 指定消息key和消息体的序列化编码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.springframework.kafka.support.serializer.JsonSerializerconsumer: #消费者# 指定消息key和消息体的反序列化解码方式,与生产者序列化方式一一对应key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.springframework.kafka.support.serializer.JsonDeserializerproperties:  # 这个配置参数相对默认,会在下文中介绍spring:json:trusted:packages: '*'

注意:生产者的序列化器和消费者的反序列化器是成对出现的,也就是说生产者序列化value采用JSON的方式,消费者反序列化的时候也应该采用JSON的方式。

3. 生产与消费

生产与消费什么内容呢? 今年小麦是个热点,就小麦吧

public class Wheat {private String color;private String category;private double age;@Overridepublic String toString() {return "Wheat{" +"color='" + color + '\'' +", category='" + category + '\'' +", age=" + age +'}';}//这里省略了若干get、set方法
}

生产者测试用例

@SpringBootTest
class SpringKafkaTest {@ResourceKafkaTemplate<String, Wheat> kafkaTemplate;@Testvoid test() {Wheat wheat = new Wheat();wheat.setAge(0.2);wheat.setCategory("ck567");wheat.setColor("yello");//将wheat发往wheat-test这个topickafkaTemplate.send("wheat-test",wheat);}}
  • KafkaTemplate是Spring针对kafka生产者封装的模板操作类,可以使用泛型,上文中的<String,Wheat>表示发送的数据消息的key的数据类型是String,数据体value的数据类型是User。
  • 因为配置了value-serializer: org.springframework.kafka.support.serializer.JsonSerializer,所以User对象会被序列化为JSON对象之后发往kafka服务端。
  • 需要注意的是:在进行数据发送之前我并没有说需要在服务端新建一个主题“wheat-test”,这是因为,默认情况下当生产者发送数据的主题不存在的时候,会新建一个主题(该主题只有一个分区)。

消费者实现与测试

@Component
public class DemoConsumer {@KafkaListener(topics = "wheat-test" , groupId = "wheat-test-group")public void dealWheat(Wheat wheat) {System.out.println(wheat.toString());}
}
  • 核心注解是KafkaListener,topics指定了消费哪个主题的数据,gourpId指定了消费者组的名称
  • 这里使用Wheat作为方法参数,是因为kafka消费者会调用反序列化器value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer将生产者发送的Wheat对象反序列化。
  • 注意这里的消费者组只有一个消费者,如果希望启动多个消费者线程,可以设置@KafkaListener(concurrency=n)。(用法:消费者线程数=主题分区数)

二、生产者同步异步分区拦截

1. Send参数说明

在这里插入图片描述

在Spring KafkaTemplate的send()方法还支持其他参数,具体如下:

  • topic:Topic主题的的名称
  • partition:主题的分区编号,编号从0开始。表示消息数据指定发送到该分区中
  • timestamp:时间戳,一般默认当前时间戳
  • key:消息的键,可以是不同数据类型,但是通常是String。具有相同key的消息被发往同一个分区,也就是说具有相同key的消息可以保证数据有序性。
  • data:消息的数据,可以是不同数据类型
  • ProducerRecord:消息对应的封装类,包含上述字段,较少使用
  • Message:Spring自带的Message封装类,包含消息及消息头,较少使用

使用send方法的方式,发送之后就不再等待服务端对该消息的确认,如果出现异常生产者客户端不会有任何的感知。为了能够使生产者能够感知到消息是否真的发送成功了,有两种方式

  • 同步发送
  • 异步发送 + 回调函数

2. 异步发送

通过addCallback添加回调函数,success方法在消息发送被服务端确认成功后被调用;failure方法在消息发送失败后被调用。

@Test
public void testAsync() {Wheat wheat = new Wheat();wheat.setAge(0.2);wheat.setCategory("ck567");wheat.setColor("yello");kafkaTemplate.send("wheat-test", wheat).addCallback(success -> {// 消息发送到的topicString topic = success.getRecordMetadata().topic();// 消息发送到的分区int partition = success.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = success.getRecordMetadata().offset();System.out.println("发送消息成功:" + topic + ",分区:" + partition + ",偏移量:" + offset);}, failure -> {System.out.println("发送消息失败:" + failure.getMessage());});
}

3. 同步发送

默认情况下send()方法就是异步调用的方法,如果想实现同步阻塞的方法,需要在send方法的基础上调用get()方法。get()无参方法有一个重载方法get(long timeout, TimeUnit unit),当超过一定的时长服务端仍无消息写入成功确认,则抛出TimeoutException异常。

@Test
public void testSync() {try {Wheat wheat = new Wheat();wheat.setAge(0.2);wheat.setCategory("ck567");wheat.setColor("yello");// 同步发送消息SendResult<String, Wheat> sendResult = kafkaTemplate.send("wheat-test", wheat).get(2, TimeUnit.MINUTES);// 消息发送到的topicString topic = sendResult.getRecordMetadata().topic();// 消息发送到的分区int partition = sendResult.getRecordMetadata().partition();// 消息在分区内的offsetlong offset = sendResult.getRecordMetadata().offset();System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);} catch (InterruptedException | ExecutionException e) {System.out.println("发送消息失败:" + e.getMessage());} catch (TimeoutException e) {System.out.println("发送消息超时无响应:" + e.getMessage());}
}

4. 拦截器与分区配置

生产者在执行数据发送的时候,可以配置拦截器和分区器,在本专栏的之前的文章中我们已经介绍了二者的自定义方式。自定义完成之后,就是该如何使用的问题。如下:

# 可以在上面👆yml配置那里找到
properties:interceptor.classes: com.ck567.kafka.MyProducerInterceptorpartitioner.class: com.ck567.kafka.MyProducerPartitioner

注意拦截器和分区器在Spring看来属于不常用的配置属性,对于不常用的原生配置属性,spring全都放在properties下面进行配置。也就是说原生API中,通过Properties传递给生产者的属性,在这里全部都支持。

一个是classes、一个是class,分区器只能配置一个,拦截器可以配置多个。

三、Kafka事务处理

1. 场景模拟

我们使用kafkaTemplate.send向kafka发送数据,但是发送数据之后方法内部抛出了异常。假如我们的代码含义是下面的这样的

  • 用户订单支付,向kafka发送数据,为用户增加积分
  • 然后把用户的订单支付结果存入数据库

订单支付未成功,可能用户余额不足,抛出异常。但是向kafka发送的数据已经发出去了,这显然不是我们希望看到的。我们期望的结果是:订单支付成功和用户积分增加成功,要么都成功,要么都失败。

2. 事务处理

下面是带事务处理的kafka生产者代码

//带事务处理的发送方式
public void rightSend(){Order order = new Order();order.setCurrencyType("RMB");order.setCount(18);order.setStatus("success");// 声明事务:operations函数报错,消息就不会发出去。kafkaTemplate.executeInTransaction(operations -> {//数据发往kafkaoperations.send("order-test",order);//模拟后续业务处理发生了异常throw new RuntimeException("fail");});
}

注意: Spring提供了万能的@Transactional注解,是可以用来管理kafka事务的,但是需要针对kafka做额外的配置管理。

加之通常情况下,spring的注解用于数据库事务处理,如果再结合数据库多数据源、分布式事务相关的处理,很有可能会造成不可预知的问题。所以我建议使用上面这个方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pgtn.cn/news/19.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈,一经查实,立即删除!

相关文章

手动安装nginx,ssl双证书引入。

最近购买了两个域名&#xff0c;目前还没有其他业务场景&#xff0c;就想都解析在一个服务器上&#xff0c;发现原来安装的nginx ssl版本太低&#xff0c;不能适配两个证书。就对这个进行了研究。 依赖库安装 1. 安装 gcc 环境 $ sudo yum -y install gcc gcc-c # nginx 编译…

jenkins流水线部署,项目一键部署到k8s,项目一键部署K8S

预操作 打通jenkins、gitlab凭证&#xff0c; 见下面这篇文章 https://blog.51cto.com/wzlinux/2160109 然后从这里拿到凭证ID 用以在后面的Jenkinsfile中进行配置。 在阿里云&#xff0c;拿到镜像仓库凭证&#xff0c;用于自动上传镜像。 Jenkinsfile 放在项目的根目录…

提交代码触发Jenkins流水线更新

提交哪个构建哪个 或者 某个指定分支提交才构建 jenkins自己的restful接口在权限认证上比较麻烦&#xff0c;所以我选择了Jenkins的插件generic-webhook-trigger 使用下来感觉没有选错 1. 安装jenkins插件generic-webhook-trigger 在系统配置中去安装插件。 2. 在Jenkinsfile…

java 生成grpc调用service

1. 安装protoc 这里以mac为例&#xff1a; brew install automake brew install libtool brew install protobuf检查 protoc --version2. 下载protoc-(java/go/kotlin)插件 到这里下载对应系统的exe文件 https://repo.maven.apache.org/maven2/io/grpc/protoc-gen-grpc-java…

从言行合一到知行合一

本篇记录突然的随想&#xff0c;偶尔停留 2021年只有两种人在写博客&#xff0c;一种是试图建立受众并从中获利的人&#xff0c;另一种是只想写出想法、而没有任何目标的人。 这两种人的行为都非常好。选择做你喜欢的事&#xff0c;坚持下去&#xff0c;它们最终都可以对他人…