rabbitmq 使用SAC队列实现顺序消息

rabbitmq 使用SAC队列实现顺序消息

前提

SAC: single active consumer, 是指如果有多个实例,只允许其中一个实例消费,其他实例为空闲

目的

实现消息顺序消费,操作:

  • 创建4个SAC队列,
  • 消息的路由key 取队列个数模,这里是4
  • 发送消息到每个队列,保证每个队列只有一个消费者!!

实现

定义消息 SeqMessage
@Data
@AllArgsConstructor
public class SeqMessage implements Serializable {

    //消息id
    private String requestNo;
    //消息中顺序,1,2,3,4
    private int order;
}
创建 队列 绑定
@Configuration
public class OrderQueueConfiguration {

    public static final String EXCHANGE = "order-ex";
    public static final String RK_PREFIX = "rk-";
    public static final String ONE_QUEUE = "one-queue";
    public static final String TWO_QUEUE = "two-queue";
    public static final String THREE_QUEUE = "three-queue";
    public static final String FOUR_QUEUE = "four-queue";

    @Bean
    public DirectExchange exchange() { // 使用直连的模式
        return new DirectExchange(EXCHANGE, true, false);
    }

    @Bean
    public Binding oneBinding() {
        return BindingBuilder.bind(oneQueue()).to(exchange()).with(RK_PREFIX + 1);
    }
    @Bean
    public Binding twoBinding() {
        return BindingBuilder.bind(twoQueue()).to(exchange()).with(RK_PREFIX + 2);
    }
    @Bean
    public Binding threeBinding() {
        return BindingBuilder.bind(threeQueue()).to(exchange()).with(RK_PREFIX + 3);
    }
    @Bean
    public Binding fourBinding() {
        return BindingBuilder.bind(fourQueue()).to(exchange()).with(RK_PREFIX + 3);
    }


    @Bean
    public Queue oneQueue() {
        return createSacQueue(ONE_QUEUE);
    }

    @Bean
    public Queue twoQueue() {
        return createSacQueue(TWO_QUEUE);
    }

    @Bean
    public Queue threeQueue() {
        return createSacQueue(THREE_QUEUE);
    }

    @Bean
    public Queue fourQueue() {
        return createSacQueue(FOUR_QUEUE);
    }

    private static Queue createSacQueue(String queueName) {
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-single-active-consumer", true);
        return new Queue(queueName, true, false, false, arguments);
    }

}

重要的是 x-single-active-consumer 这个属性, 只有一个实例生效

在这里插入图片描述

创建 消费者

为每个队列创建一个监听消费者

@Slf4j
@Component
public class OrderListener {


    @RabbitListener(bindings = @QueueBinding(
                    exchange = @Exchange(value = EXCHANGE,declare = "false"),
                    value = @Queue(value = ONE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 1))
    public void onMessage1(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", ONE_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = EXCHANGE,declare = "false"),
            value = @Queue(value = TWO_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 2))
    public void onMessage2(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", TWO_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = EXCHANGE,declare = "false"),
            value = @Queue(value = THREE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 3))
    public void onMessage3(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", THREE_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = EXCHANGE,declare = "false"),
            value = @Queue(value = FOUR_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 4))
    public void onMessage4(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", FOUR_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }

}
生产者发送消息
@GetMapping("/send/seq/messqge")
 public String sendSeqMessage() throws JsonProcessingException {
     int cnt = 100;
     int mod = 4;
     int seqSize = 6;
     for (int i = 0; i < cnt; i++) {
         for (int j = 0; j < seqSize; j++) {
             int rk = i % mod + 1;
             SeqMessage seqMessage = new SeqMessage("seq-" + i, j);
             String s = objectMapper.writeValueAsString(seqMessage);
             log.info("routeKey: {}, send msg: {}", rk, s);
             rabbitTemplate.convertAndSend(EXCHANGE, RK_PREFIX + rk, s);
         }
     }
     return "success";
 }

运行结果:

two-queue recv: {"requestNo":"seq-1","order":0}
two-queue recv: {"requestNo":"seq-1","order":1}
two-queue recv: {"requestNo":"seq-1","order":2}
two-queue recv: {"requestNo":"seq-1","order":3}
two-queue recv: {"requestNo":"seq-1","order":4}
two-queue recv: {"requestNo":"seq-1","order":5}
two-queue recv: {"requestNo":"seq-5","order":0}
two-queue recv: {"requestNo":"seq-5","order":1}
two-queue recv: {"requestNo":"seq-5","order":2}
two-queue recv: {"requestNo":"seq-5","order":3}
two-queue recv: {"requestNo":"seq-5","order":4}
two-queue recv: {"requestNo":"seq-5","order":5}

three-queue recv: {"requestNo":"seq-2","order":0}
three-queue recv: {"requestNo":"seq-2","order":1}
three-queue recv: {"requestNo":"seq-2","order":2}
three-queue recv: {"requestNo":"seq-2","order":3}
three-queue recv: {"requestNo":"seq-2","order":4}
three-queue recv: {"requestNo":"seq-2","order":5}
three-queue recv: {"requestNo":"seq-6","order":0}
three-queue recv: {"requestNo":"seq-6","order":1}
three-queue recv: {"requestNo":"seq-6","order":2}
three-queue recv: {"requestNo":"seq-6","order":3}
three-queue recv: {"requestNo":"seq-6","order":4}
three-queue recv: {"requestNo":"seq-6","order":5}

可以发现,消息消费是顺序的

good luck!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/559018.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

java调用讯飞星火认知模型

前往讯飞开发平台选择产品&#xff0c;获取appId、apiKey、APISecret&#xff0c;这里我选择的是v3.0模型。 java后端实现 本项目以及实现了基本的会话功能&#xff0c;小伙伴可以自己扩充其他的例如绘画功能。 注意&#xff1a;星火模型的api使用的是websocket协议&#xf…

C++11(下篇)

文章目录 C111. 模版的可变参数1.1 模版参数包的使用 2. lambda表达式2.1 Lambda表达式语法捕获列表说明 2.2 lambda的底层 3. 包装器3.1 function包装器3.2 bind 4. 线程库4.1 thread类4.2 mutex类4.3 atomic类4.4 condition_variable类 C11 1. 模版的可变参数 C11支持模版的…

数据结构习题-- 相交链表

数据结构习题-- 相交链表 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 如上图&#xff0c;返回c1结点 注意&#xff1a;这两个链表非环形 方法&#xff1a;集合 分析 由…

关于ERA5气压和温度垂直补偿公式的对比情况

1. 气压和温度垂直补偿对比 「谨代表给个人观点&#xff0c;杠精请自测&#xff0c;对对对&#xff0c;好好好&#xff0c;你说啥都对」。 使用2020-2022陆态网GNSS与探空站并址的48个站点实验&#xff0c;以探空站为真值&#xff0c;验证ERA5精度。怎么确定并址请看前面文章…

Django中的实时通信:WebSockets与异步视图的结合

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 在现代Web应用程序中&#xff0c;实时通信已经成为了必不可少的功能之一。无论是在线聊天、…

UKP3D,出轴 /平面图时,选项中出图比例,绘图比例,打印比例的区别

Q:用户问&#xff0c;轴测图正常&#xff0c;平面图位置不对&#xff0c;这个也需要在xml里面调整吗&#xff1f; 在此&#xff0c;先不回复上述问题&#xff0c;而是解释在出图规则里的选项意思。 1.图框比例&#xff1a;图框比例1:100&#xff0c;例如选中的图幅是A0横式&…

现代图形API综合比较:Vulkan | DirectX | Metal | WebGPU

Vulkan、DirectX、Metal 和 WebGPU 等低级图形 API 正在融合为类似于当前 GPU 构建方式的模型。 图形处理单元 (GPU) 是异步计算单元&#xff0c;可以处理大量数据&#xff0c;例如复杂的网格几何形状、图像纹理、输出帧缓冲区、变换矩阵或你想要计算的任何数据。 NSDT工具推荐…

TFS(淘宝分布式存储引擎

TFS&#xff08;淘宝分布式存储引擎&#xff09; 什么是TFS&#xff1f; ​ 根据淘宝2016年的数据分析&#xff0c;淘宝卖家已经达到900多万&#xff0c;有上十亿的商品。每一个商品有包括大量的图片和文字(平均&#xff1a;15k)&#xff0c;粗略估计下&#xff0c;数据所占的…

编译一个基于debian/ubuntu,centos,arhlinux第三方系统

目录 前言 准备工作 下载linux源码进行编译 linux源码下载 网站 问题 解决办法 编译 可能会遇到的问题 chroot下载debian环境 进入虚拟环境 把chroot的根目录文件打包为.gz文件 编译init文件&#xff08;用于系统启动时的一系列引导&#xff09; 给予文件夹权限 …

pip下载包opencv出错(报错failed building wheel for opencv-python解决方法)

文章目录 1 报错2 原因3 解决方法参考 1 报错 ERROR: Could not build wheels for opencv-python, which is required to install pypr2 原因 版本不兼容的问题,当使用pip install opencv-python命令安装的是最新版本&#xff0c;当前python版本不支持。需要安装当前版本pyth…

34. 【Android教程】菜单:Menu

作为 Android 用户&#xff0c;你一定见过类似这样的页面&#xff1a; 它就是我们今天的主角——菜单&#xff0c;它的使用场景和作用不用多说&#xff0c;几乎每个 App 都会用到它&#xff0c;今天我们就一起来看看 Android 提供的几种菜单类型及用法。 1. 菜单的几种类型 根…

✌粤嵌—2024/4/12—插入区间✌

代码实现&#xff1a; 解题思路&#xff1a;先将数组 newInterval 插入到数组 intervals 的末尾&#xff0c;再转换成合并区间 /*** Return an array of arrays of size *returnSize.* The sizes of the arrays are returned as *returnColumnSizes array.* Note: Both returne…

LabVIEW专栏六、LabVIEW项目

一、梗概 项目&#xff1a;后缀是.lvproj&#xff0c;在实际开发的过程中&#xff0c;一般是要用LabVIEW中的项目来管理代码&#xff0c;也就是说相关的VI或者外部文件&#xff0c;都要放在项目中来管理。 在LabVIEW项目中&#xff0c;是一个互相依赖的整体&#xff0c;可以包…

319_C++_使用QT自定义的对话框,既能选择文件也能选择文件夹,为什么使用QListView和QTreeView来达成目的?

解析 1: 在 Qt 中,QFileDialog::setOption 方法用于设置文件对话框的一些选项,以改变其行为或外观。QFileDialog::DontUseNativeDialog 是这些选项之一,当设置为 true 时,它会告诉 QFileDialog 不要使用操作系统提供的原生文件对话框,而是使用 Qt 自己实现的对话框样式。…

js作业微博发言与轮播(有瑕疵)

微博 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv"X-UA-Compatible" content&q…

H5 台球猜位置小游戏

刷到抖音有人这样玩&#xff0c;就写了一个这样的小游戏练习一下H5的知识点。 小游戏预览 w(&#xff9f;Д&#xff9f;)w 不开挂越急越完成不了&#xff0c;&#x1f47f;确认15次也没全对… 知识点 获取坐标位置的DOM元素&#xff0c;感觉应该是新的吧&#xff0c;以前的…

Aws Nat Gateway

要点 NAT网关要能访问外网&#xff0c;所以需要部署在有互联网网关的Public子网中。 关键&#xff1a; NAT网关创建是选择子网&#xff0c;一定要选择公有子网&#xff08;有互联网网关子网&#xff09; 特别注意&#xff1a; 新建nat网关的时候&#xff0c;选择的子网一定…

jeecgflow之camunda工作流-并行网关

引言 书接上回&#xff0c;继续三国流程系列教程。 本文主要讲解并行网关。 并行网关允许流程中的多个任务同时执行&#xff0c;从而提高流程的执行效率。 并行网关会忽视序列流上的条件设置。 并行网关分为两部分。 Fork: 用于任务开始 Join:用于任务结束 体验文章demo演示站…

【Camera Framework笔记】二、Camera Native Framework架构①

一、总体架构&#xff1a; service -> opencamera -> client&#xff08;api1/api2&#xff09; -> device3&#xff08;hal3&#xff09; | | &#xff08;不opencamera…

kubernetes学习

1、应用部署方式演变 2、kubernetes介绍 3、kubernetes组件 4、kubernetes概念 5、环境搭建-环境规划 集群类型&#xff1a; 安装方式&#xff1a; 主机规划&#xff1a; 6、环境搭建-主机安装 使用虚拟机安装三台centos7&#xff08;一主二从&#xff09;&#xff0c;然后在…
最新文章