编辑
2024-11-08
实用工具
00
请注意,本文编写于 146 天前,最后修改于 146 天前,其中某些信息可能已经过时。

目录

说明
下载并启动
项目使用
引入依赖
启动类
依赖配置
消息发送
消息接收
广播
创建topic
生产者配置
发送广播
消费者配置
接收广播

说明

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架。它基于 SpringBoot 来创建具有生产级别的单机 Spring 应用,并且使用 Spring Integration 与 Broker 进行连接。

Spring Cloud Stream 提供了消息中间件配置的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:Binder 和 Binding。

  • Binder: 跟外部消息中间件集成的组件,用来创建 Binding,各消息中间件都有自己的 Binder 实现。

比如 Kafka 的实现 KafkaMessageChannelBinder,RabbitMQ 的实现 RabbitMessageChannelBinder 以及 RocketMQ 的实现 RocketMQMessageChannelBinder。

  • Binding: 包括 Input Binding 和 Output Binding。

Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。 文档

下载并启动

下载地址

启动 Name Server

sh bin/mqnamesrv

启动 Broker

sh bin/mqbroker -n localhost:9876

项目使用

引入依赖

<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> </dependency>

启动类

java
@SpringBootApplication @EnableBinding({ Source.class, Sink.class }) public class RocketMQApplication { public static void main(String[] args) { SpringApplication.run(RocketMQApplication.class, args); } }

依赖配置

yml
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876 # 使用 name 为 output 对应的 binding 发送消息到 test-topic 这个 topic。 spring.cloud.stream.bindings.output.destination=test-topic spring.cloud.stream.bindings.output.content-type=application/json # 使用2个 input binding 订阅数据。 # input1: 订阅 topic 为 test-topic 的消息,顺序消费所有消息(顺序消费的前提是所有消息都在一个 MessageQueue 中) spring.cloud.stream.bindings.input1.destination=test-topic spring.cloud.stream.bindings.input1.content-type=text/plain spring.cloud.stream.bindings.input1.group=test-group1 spring.cloud.stream.rocketmq.bindings.input1.consumer.orderly=true # input2: 订阅 topic 为 test-topic 的消息,异步消费 tags 为 tagStr 的消息,Consumer 端线程池个数为20 spring.cloud.stream.bindings.input2.destination=test-topic spring.cloud.stream.bindings.input2.content-type=text/plain spring.cloud.stream.bindings.input2.group=test-group2 spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=false spring.cloud.stream.rocketmq.bindings.input2.consumer.tags=tagStr spring.cloud.stream.bindings.input2.consumer.concurrency=20

消息发送

java
//使用 MessageChannel 进行消息发送: public class ProducerRunner implements CommandLineRunner { @Autowired private MessageChannel output; // 获取name为output的binding @Override public void run(String... args) throws Exception { Map<String, Object> headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_TAGS, "tagStr"); Message message = MessageBuilder.createMessage(msg, new MessageHeaders(headers)); output.send(message); } } //或者使用 RocketMQ 原生的 API 进行消息发送: public class RocketMQProducer { DefaultMQProducer producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); Message msg = new Message("test-topic", "tagStr", "message from rocketmq producer".getBytes()); producer.send(msg); }

消息接收

java
//使用 @StreamListener 注解接收消息: @Service public class ReceiveService { @StreamListener("input1") public void receiveInput1(String receiveMsg) { System.out.println("input1 receive: " + receiveMsg); } @StreamListener("input2") public void receiveInput2(String receiveMsg) { System.out.println("input2 receive: " + receiveMsg); } }

广播

广播会发送消息给所有消费者。如果你想同一消费组下所有消费者接收到同一个topic下的消息,广播消费非常适合此场景。

创建topic

sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t broadcast

生产者配置

yml
server: port: 28085 spring: application: name: rocketmq-broadcast-producer-example cloud: stream: rocketmq: binder: name-server: localhost:9876 bindings: producer-out-0: producer: group: output_1 bindings: producer-out-0: destination: broadcast logging: level: org.springframework.context.support: debug

发送广播

java
//使用ApplicationRunner和StreamBridge发送消息。 @SpringBootApplication public class RocketMQBroadcastProducerApplication { private static final Logger log = LoggerFactory .getLogger(RocketMQBroadcastProducerApplication.class); @Autowired private StreamBridge streamBridge; public static void main(String[] args) { SpringApplication.run(RocketMQBroadcastProducerApplication.class, args); } @Bean public ApplicationRunner producer() { return args -> { for (int i = 0; i < 100; i++) { String key = "KEY" + i; Map<String, Object> headers = new HashMap<>(); headers.put(MessageConst.PROPERTY_KEYS, key); headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i); Message<SimpleMsg> msg = new GenericMessage<SimpleMsg>(new SimpleMsg("Hello RocketMQ " + i), headers); streamBridge.send("producer-out-0", msg); } }; } }

消费者配置

server: port: 28084 spring: application: name: rocketmq-broadcast-consumer1-example cloud: stream: function: definition: consumer; rocketmq: binder: name-server: localhost:9876 bindings: consumer-in-0: consumer: messageModel: BROADCASTING bindings: consumer-in-0: destination: broadcast group: broadcast-consumer logging: level: org.springframework.context.support: debug

接收广播

@SpringBootApplication public class RocketMQBroadcastConsumer1Application { private static final Logger log = LoggerFactory .getLogger(RocketMQBroadcastConsumer1Application.class); public static void main(String[] args) { SpringApplication.run(RocketMQBroadcastConsumer1Application.class, args); } @Bean public Consumer<Message<SimpleMsg>> consumer() { return msg -> { log.info(Thread.currentThread().getName() + " Consumer1 Receive New Messages: " + msg.getPayload().getMsg()); }; } }

本文作者:Weee

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!