Apache RocketMQ 是一款 低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列 RocketMQ 可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
随着微服务架构的流行,服务之间的关系梳理非常重要。异步解耦可以降低服务之间的耦合程度,同时也能提高服务的吞吐量。 使用异步解耦的业务场景非常多,因为每个行业的业务都会不太一样,以一些比较通用的业务来说明相信大家都能理解。 比如电商行业的下单业务场景,以最简单的下单流程来说,下单流程如下:
不重要的操作可以交给rocketMq操作
削峰填谷指的是在大流量的冲击下,利用 RocketMQ 可以抗住瞬时的大流量,保护系统的稳定性,提升用户体验。 在电商行业,最常见的流量冲击就是秒杀活动了,利用 RocketMQ 来实现一个完整的秒杀业务还是与很多需要做的工作,不在本文的范围内,后面有机会可以单独跟大家聊聊。想告诉大家的是像诸如此类的场景可以利用 RocketMQ 来扛住高并发,前提是业务场景支持异步处理。
众所周知,分布式事务有 2PC,TCC,最终一致性等方案。其中使用消息队列来做最终一致性方案是比较常用的。 在电商的业务场景中,交易相关的核心业务一定要确保数据的一致性。通过引入消息队列 RocketMQ 版的分布式事务,既可以实现系统之间的解耦,又可以保证最终的数据一致性。
数据分发指的是可以将原始数据分发到多个需要使用这份数据的系统中,实现数据异构的需求。最常见的有将数据分发到 ES, Redis 中为业务提供搜索,缓存等服务。 除了手动通过消息机制进行数据分发,还可以订阅 Mysql 的 binlog 来分发,在分发这个场景,需要使用 RocketMQ 的顺序消息来保证数据的一致性。
常见的四种
RocketMQ 中无特性的消息。当没有特殊的业务场景,使用普通消息就够了。如果有特殊的场景,就可以使用特殊的消息类型,比如顺序,事务等。
消息发送方发送出去一条消息,会同步得到服务端返回的结果。
消息发送方发出去一条消息,不用等待服务端返回结果,可以接着发送下一条消息。发送方可以通过回调接口接收服务端响应,并处理响应结果。
消息发送方只负责发送消息,发送出去后就不管了,这种方式发送速度非常快,存在丢失消息的风险。
顺序消息是指生产者按照一定的先后顺序发布消息;消费者按照既定的先后顺序订阅消息,即先发布的消息一定会先被消费者接收到。 比如数据分发的场景,如果我们订阅了 Mysql 的 binlog 来进行数据异构。消息要是没有顺序,就会出现数据错乱问题。 比如新增一条 id=1 的数据,然后马上删除。这样就产生了两条消息。正常的消费顺序是先新增,然后删除,此时数据是没有的。如果消息没有顺序,删除的先被消费了,然后消费新增的,此时数据还在,没被删除掉,就会导致不一致。
定时消息是指消息具备定时发送的功能,当消息发送到服务端后,不会立即投递给消费者。而是要等到消息指定的时间后才会投递给消费者进行消费。 延迟消息也就是定时消息,定时消息是定在某个时间点进行发送,比如 2020-11-11 12:00:00 发送。 延迟消息一般是在当前发送时间的基础上延迟多久进行发送,比如当前时间是 2020-09-10 12:00:00,延迟 10 分钟,那么消息发送成功后将在 2020-09-10 12:10:00 进行投递给消费者。 定时消息可以在订单超时未支付自动取消等场景使用。
RocketMQ 提供类似 X/Open XA 的分布式事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致。 交互流程:
发送方首先发送半事务消息到 RocketMQ 服务端。
RocketMQ 服务端接收到消息,然后将消息持久化成功之后,向发送方返回 Ack 确认消息已经发送成功,此时消息为半事务消息,不会投递给消费方。
收到半事务消息的 Ack 后,发送方开始执行本地事务逻辑。
发送方根据本地事务执行结果向服务端提交二次确认,如果本地事务执行成则进行消息的 Commit,如果执行失败则进行消息的 Rollback,服务端收到 Commit 状态则将半事务消息标记为可投递,消费方最终将收到该消息;服务端收到 Rollback 状态则删除半事务消息,消费方将不会收到该消息。
如果出现意外情况,步骤 4 没有进行消息的二次确认,等待固定时间后服务端将对该消息发起消息回查。
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半事务消息进行操作。
基础服务以及可视化安装请看这篇文章 pom配置
xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.1.RELEASE</version>
</dependency>
</dependencies>
</project>
application 配置
ymlrocketmq:
producer:
group: testProducer
name-server: 127.0.0.1:9876
启动类
java@SpringBootApplication
public class RocketMQApplication {
public static void main(String[] args) {
SpringApplication.run(RocketMQApplication.class, args);
}
}
生产者类
java
/**
* 生产者
*
*/
public class Producer {
public static void main(String[] args) throws Exception {
//创建一个生产者,指定生产者组为testProducer
DefaultMQProducer producer = new DefaultMQProducer("testProducer");
// 指定NameServer的地址
producer.setNamesrvAddr("127.0.0.1:9876");
// 第一次发送可能会超时,我设置的比较大
producer.setSendMsgTimeout(60000);
// 启动生产者
producer.start();
//创建一条消息
// topic为 MessageTopic
// tags 为 TagA
Message msg = new Message("MessageTopic", "TagA", "今天天气怎么样 ".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息并得到消息的发送结果,然后打印
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
消费者类
java
/**
* 消费者
*
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 通过push模式消费消息,指定消费者组
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Consumer");
// 指定NameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
// 订阅这个topic下的所有的消息
consumer.subscribe("MessageTopic", "*");
// 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消息监听
java/**
* 监听rocketmq主题的消息
*
*/
@Component
@RocketMQMessageListener(consumerGroup = "Consumer", topic = "MessageTopic")
public class TopicListener implements RocketMQListener<String> {
@Override
public void onMessage(String msg) {
System.out.println("处理消息:" + msg);
}
}
本文作者:Weee
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!