【Kafka学习】SpringCloudStream集成Kafka⽰例⼀、项⽬下载
下载地址:
⼆、配置Pom⽂件
<dependencies>
<!-- 增加了 Controller ⽅便测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- 核⼼jar -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.0.1.RELEASE</version>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
三、配置l
server:
port:2021
servlet:
context-path: /
tomcat:
uri-encoding: UTF-8
spring:
cloud:
stream:
kafka:
binder:
brokers: 127.0.0.1:9092
# auto-create-topics: true
bindings:
channel-name-1:
destination: topic-name
content-type: application/json
group: oms-group-1
channel-name-2:
destination: topic-name
content-type: application/json
group: oms-group-2
四、⽣产者代码
/**
* @author liuxx
* @date 2021/12/9 16:48
* @desc 订单管理信道接⼝
*/
public interface OmsSource {
String CHANNEL_NAME_1 ="channel-name-1";
String CHANNEL_NAME_2 ="channel-name-2";
@Output(OmsSource.CHANNEL_NAME_1)
MessageChannel outputChannel1();
@Output(OmsSource.CHANNEL_NAME_2)
MessageChannel outputChannel2();
}
package;
import Logger;
import LoggerFactory;
import EnableBinding;
import MessageBuilder;
import Component;
import Resource;
/**
* @author liuxx
* @date 2021/12/9 16:58
* @desc 发送订单相关消息
*/
@Component
@EnableBinding(OmsSource.class)
public class OmsProducer {
@Resource
private OmsSource omsSource;
private Logger logger = Logger(OmsProducer.class);
public void outputChannel1(String msg){
boolean successful = omsSource.outputChannel1().send(MessageBuilder.withPayload(msg).build());        logger.info("给发送OMS的Channel-1消息 {}", successful ?"成功":"失败");
}
public void outputChannel2(String msg){
boolean successful = omsSource.outputChannel1().send(MessageBuilder.withPayload(msg).build());        logger.info("给发送OMS的Channel-2消息 {}", successful ?"成功":"失败");
}
}
五、消费者代码
/**
* @author liuxx
* @date 2021/12/9 16:45
springcloud难学吗* @desc 订单管理信道接⼝
*/
public interface OmsSink {
String CHANNEL_NAME_1 ="channel-name-1";
String CHANNEL_NAME_2 ="channel-name-2";
@Input(OmsSink.CHANNEL_NAME_1)
SubscribableChannel inputChannel1();
@Input(OmsSink.CHANNEL_NAME_2)
SubscribableChannel inputChannel2();
}
package;
import Logger;
import LoggerFactory;
import EnableBinding; import StreamListener; import Component;
/
**
* @author liuxx
* @date 2021/12/9 16:59
* @desc 消费订单相关消息
*/
@Component
@EnableBinding(OmsSink.class)
public class OmsConsumer {
private Logger logger = Logger(OmsConsumer.class);
@StreamListener(OmsSink.CHANNEL_NAME_1)
public void inputChannel1(String message){
logger.info("开始消费信道 1 消息:{}", message);
}
@StreamListener(OmsSink.CHANNEL_NAME_2)
public void inputChannel2(String message){
logger.info("开始消费信道 2 消息:{}", message);
}
}
六、消息测试
import OmsProducer;
import GetMapping;
import PathVariable;
import RequestMapping;
import RestController;
import Resource;
/**
* @author liuxx
* @date 2021/12/9 16:58
* @desc 测试kafka消息controller
*/
@RestController
@RequestMapping("/kafka")
public class KafkaMessageController {
@Resource
private OmsProducer omsProducer;
@GetMapping("/send/1/{msg}")
public String sendChannel1(@PathVariable String msg){
omsProducer.outputChannel1(msg);
return msg;
}
@GetMapping("/send/2/{msg}")
public String sendChannel2(@PathVariable String msg){
omsProducer.outputChannel1(msg);
return msg;
}
}
七、测试
localhost:2021/kafka/send/1/{"key":"value"}
结果:
说明:这⾥使⽤了两个channel做测试。是因为可能存在⼀条消息在同⼀个项⽬⾥存在多个业务需要消费,但是特别需要注意的是配置⽂件中 group 不能⼀样
⼋、问题
1.同⼀个项⽬中同⼀个Topic,配置多个消费信道时,错误配置(Group配置相同):
Exception thrown while starting consumer:
BinderException: Exception thrown while starting consumer:
at AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:471)~[spring-cloud-stream-3.0.1.RELEASE.jar:3.0.1.RELEASE]
at AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:90)~[spring-cloud-st ream-3.0.1.RELEASE.jar:3.0.1.RELEASE]
at AbstractBinder.bindConsumer(AbstractBinder.java:143)~[spring-cloud-stream-3.0.1.RELEASE.jar:3.0.1.RELE ASE]
at BindingService.lambda$rescheduleConsumerBinding$0(BindingService.java:194)~[spring-cloud-stream-3.0.
1.RELEASE.jar:3.0.1.RELEASE]
at DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)~[spring-context-5.2.13.REL EASE.jar:5.2.13.RELEASE]
at java.base/Executors$RunnableAdapter.call(Executors.java:515)~[na:na]
at java.base/FutureTask.run(FutureTask.java:264)~[na:na]
at java.base/ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)~[na:na]
at java.base/ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)~[na:na]
at java.base/ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)~[na:na]
at java.base/Thread.run(Thread.java:834)~[na:na]
Caused by:BeanDefinitionOverrideException: Invalid bean definition s.recoverer' defined in null: Cannot register bean definition [Root bean:class[ErrorMessageSendingRecov erer]; scope=;abstract=false; lazyInit=null; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=null; factory MethodName=null; initMethodName=null; destroyMethodName=null]for bean 's.recoverer': There is already [Root bean:cla ss[ErrorMessageSendingRecoverer]; scope=;abstract=false; lazyInit=null; autowireMode=0; dependencyC heck=0; autowireCandidate=tru
e; primary=false; factoryBeanName=null; factoryMethodName=null; initMethodName=null; destroyMethodName=null] bound .
isterBeanDefinition(DefaultListableBeanFactory.java:945)~[spring-beans-5.
2.1
3.RELEASE.jar:5.2.13.RELEASE]
isterBeanDefinition(GenericApplicationContext.java:323)~[spring-context-5.2.13.RE LEASE.jar:5.2.13.RELEASE]
isterBean(GenericApplicationContext.java:471)~[spring-context-5.2.13.RELEASE.ja r:5.2.13.RELEASE]
isterErrorInfrastructure(AbstractMessageChannelBinder.java:691)~[sprin g-cloud-stream-3.0.1.RELEASE.jar:3.0.1.RELEASE]
isterErrorInfrastructure(AbstractMessageChannelBinder.java:643)~[sprin g-cloud-stream-3.0.1.RELEASE.jar:3.0.1.RELEASE]
ateConsumerEndpoint(KafkaMessageChannelBinder.java:642)~[sprin g-cloud-stream-binder-kafka-3.0.1.RELEASE.jar:3.0.1.RELEASE]
ateConsumerEndpoint(KafkaMessageChannelBinder.java:147)~[sprin g-cloud-stream-binder-kafka-3.0.1.RELEASE.jar:3.0.1.RELEASE]
at AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:417)~[spring-cloud-stream-3.0.1.RELEASE.jar:3.0.1.RELEASE]
...10 common frames omitted
2.注意分开Source和Sink
亲⾝经历:我们真实使⽤的时候,使⽤⼀个类定义了⽣产者和消费者,导致其他开发者乱⽤了。(消费时未配置@Input,未申明对应的Bean,直接使⽤了@StreamListener消费对应Topic,导致其他项⽬的不同消费者Group的消费者消费不到消息)
这⾥我们⽤上⾯的Demo举例:
接着重复测试:
localhost:2021/kafka/send/1/{"key":"value"}
结果:只有错误配置的channel收到了消息,另⼀个消费者没有收到
所以我们实际使⽤时,最好分开定义
3.Kafka Rebalance