spring boot RocketMQ 集成

spring boot RocketMQ 集成

RocketMQ学习

1.基本概念

RocketMQ是阿里巴巴团队使用java语言开发的一款分布式消息中间件,是一款低延迟,高可用,拥有海量消息堆积能力和灵活拓展性的消息队列。

rocketmq的官网:http://rocketmq.apache.org

gitee仓库:https://gitee.com/apache/rocketmq?_from=gitee_search

Apache RocketMQ开发者指南:https://www.itmuch.com/books/rocketmq/

RocketMQ主要由 Producer、Broker、Consumer 三部分组成。

1.1、Producer

  • 消息生产者,负责生产消息,一般由业务系统负责生产消息。

  • 发送消息到Broker服务器。

  • 发送方式:

    • 同步发送、
    • 异步发送、
    • 顺序发送、
    • 单向发送。
  • 同步和异步方式均需要Broker返回确认信息,单向发送不需要。

1.2、Consumer

  • 消息消费者,负责消费消息,一般是后台系统负责异步消费。
  • 从Broker服务器拉取消息。
  • 消费形式:
    • 拉取式消费、
    • 推动式消费。

1.3、Broker

  • Broker 在实际部署过程中对应一台服务器,
  • 一个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。
  • Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。

Topic: 消息的集合,一个主题有多条消息,每条消息只能属于某一个主题。

Broker Server:代理服务器,负责存储消息、转发消息。

Name Server:名字服务,生产者或消费者通过名字服务查找主题相应的Broker IP列表。

Message:消息,生产和消费数据的最小单位,每条消息必须属于一个主题。

Tag:标签,用于同一主题下区分不同类型的消息,可以根据不同业务目的在同一主题下设置不同标签。

2、架构设计


1 技术架构

RocketMQ架构上主要分为四部分:

  • Producer:支持分布式集群方式部署。
  • Consumer:支持分布式集群方式部署。
  • NameServer:
    • 是一个非常简单的Topic路由注册中心,支持Broker的动态注册与发现。
    • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
    • 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
  • BrokerServer:
    • 主要负责消息的存储、投递和查询以及服务高可用保证。

2 工作流程

image

  1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
  2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

3、项目使用

官方例子:https://gitee.com/apache/rocketmq/blob/develop/docs/cn/RocketMQ_Example.md

3.1、Rocketmq下载

rocketmq的官网

1、下载zip 文件解压缩到本地磁盘中。例:D: ocketmq

2、启动服务,进入bin,先执行,mqnamesrv.cmd,再启动broker,通过命令行来指定端口。

mqbroker.cmd -n localhost:9876

3.2、监控平台下载

https://github.com/apache/rocketmq-dashboard

3.3、集成springboot项目

1、添加依赖

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>

2、添加配置信息

rocketmq:
  name-server: http://127.0.0.1:9876 #rocketmq服务地址
  producer:
    group: base_group_syncMsg
    send-message-timeout: 5000
    retry-times-when-send-failed: 2
    max-message-size: 4194304

4、测试例子
生产者

@RestController
@RequestMapping("/mq")
public class ProducerController {

    @Autowired
    private RocketMQTemplate mqTemplate;

    @RequestMapping("/send")
    public String testSend(String msg) {
        try {
            mqTemplate.convertAndSend("TopicTest", msg);
            return "success";
        } catch (Exception e) {
            e.printStackTrace();
            return "fail";
        }
    }
}

消费者

@Component
@RocketMQMessageListener(topic = "TopicTest", consumerGroup = "consumerGroupTest")
@Slf4j
public class ConsumeController implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.init("------- Consumer: {}", message);
    }
}

4、幂等性

RocketMQ无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。

4.1、什么是幂等性

对一个事情进行操作,这个操作可能会执行成百上千次,但是操作结果都是相同的,这就是幂等性。

4.2、常用解决方案

业界主流的幂等性有两种操作:

  • 唯一 ID + 指纹码 机制,利用数据库主键去重
    • 缺点,每次都要查询数据库
  • 利用redis的原子性去实现
    • 缺点:可能业务执行失败,但redis标识成功。

原文地址:https://www.cnblogs.com/galenblog/archive/2022/05/16/16277332.html