首页 程序笔记 Java中如何确保RocketMQ不丢失消息?

Java中如何确保RocketMQ不丢失消息?

引入消息队列可以方便地实现系统解耦、削峰填谷等作用。但是消息队列使用不当,可能会引起消息丢失,在一些消息敏感的业务场景下,这是不允许的。今天我们使用一些Java示例来聊一聊 RocketMQ 怎么做能确保消息不丢失。

一、RocketMQ 简介

RocketMQ 是阿里巴巴开源的分布式消息中间件,整体架构如下图:

RocketMQ 主要包括 Producer、Consumer 和 Broker,同时 Name Server 进行集群注册管理和保存元数据。

二、RocketMQ消息不丢失

要想保证消息不丢失,需要从以下几个方面考虑:

Producer 发送消息 Broker 保存消息 Consumer 消费消息 Broker 主从切换

1、同步发送

代码如下:

public void send() throws Exception {
    String message = "test producer";
    Message sendMessage = new Message("topic1", "tag1", message.getBytes());
    sendMessage.putUserProperty("name1","value1");
    SendResult sendResult = null;


    DefaultMQProducer producer = new DefaultMQProducer("testGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.setRetryTimesWhenSendFailed(3);
    try {
        sendResult = producer.send(sendMessage);
    } catch (Exception e) {
        e.printStackTrace();
    }
    if (sendResult != null) {
        System.out.println(sendResult.getSendStatus());
    }
}

同步发送会返回 4 个状态码:

SEND_OK:消息发送成功。需要注意的是,消息发送到 broker 后,还有两个操作:消息刷盘和消息同步到 slave 节点,默认这两个操作都是异步的,只有把这两个操作都改为同步,SEND_OK 这个状态才能真正表示发送成功。 FLUSH_DISK_TIMEOUT:消息发送成功但是消息刷盘超时。 FLUSH_SLAVE_TIMEOUT:消息发送成功但是消息同步到 slave 节点时超时。 SLAVE_NOT_AVAILABLE:消息发送成功但是 broker 的 slave 节点不可用。

根据返回的状态码,可以做消息重试,这里设置的重试次数是 3。

消息重试时,消费端一定要做好幂等处理。

2、异步发送

代码如下:

public void sendAsync() throws Exception {
    String message = "test producer";
    Message sendMessage = new Message("topic1", "tag1", message.getBytes());
    sendMessage.putUserProperty("name1","value1");


    DefaultMQProducer producer = new DefaultMQProducer("testGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.setRetryTimesWhenSendFailed(3);
    producer.send(sendMessage, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            
        }


        @Override
        public void onException(Throwable e) {
            // TODO 可以在这里加入重试逻辑
        }
    });
}

异步发送,可以重写回调函数,回调函数捕获到 Exception 时表示发送失败,这时可以进行重试,这里设置的重试次数是 3。

3、刷盘策略

1)异步刷盘

默认。消息写入 CommitLog 时,并不会直接写入磁盘,而是先写入 PageCache 缓存后返回成功,然后用后台线程异步把消息刷入磁盘。异步刷盘提高了消息吞吐量,但是可能会有消息丢失的情况,比如断点导致机器停机,PageCache 中没来得及刷盘的消息就会丢失。

2)同步刷盘

消息写入内存后,立刻请求刷盘线程进行刷盘,如果消息未在约定的时间内(默认 5 s)刷盘成功,就返回 FLUSH_DISK_TIMEOUT,Producer 收到这个响应后,可以进行重试。同步刷盘策略保证了消息的可靠性,同时降低了吞吐量,增加了延迟。要开启同步刷盘,需要增加下面配置:

flushDiskType=SYNC_FLUSH

4、Broker 多副本和高可用

Broker 为了保证高可用,采用一主多从的方式部署。如下图:

消息发送到 master 节点后,slave 节点会从 master 拉取消息保持跟 master 的一致。这个过程默认是异步的,即 master 收到消息后,不等 slave 节点复制消息就直接给 Producer 返回成功。

这样会有一个问题,如果 slave 节点还没有完成消息复制,这时 master 宕机了,进行主备切换后就会有消息丢失。为了避免这个问题,可以采用 slave 节点同步复制消息,即等 slave 节点复制消息成功后再给 Producer 返回发送成功。只需要增加下面的配置:

brokerRole=SYNC_MASTER

改为同步复制后,消息复制流程如下:

slave 初始化后,跟 master 建立连接并向 master 发送自己的 offset; master 收到 slave 发送的 offset 后,将 offset 后面的消息批量发送给 slave; slave 把收到的消息写入 commitLog 文件,并给 master 发送新的 offset; master 收到新的 offset 后,如果 offset >= producer 发送消息后的 offset,给 Producer 返回 SEND_OK。

5、消息确认

Consumer 消费消息的代码如下:

public void consume() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    consumer.subscribe("topic1", "tag1");
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        try{
            System.out.printf("Receive New Messages: %s", msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }catch (Exception e){
            e.printStackTrace();
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    consumer.start();
}

如果 Consumer 消费成功,返回 CONSUME_SUCCESS,提交 offset 并从 Broker 拉取下一批消息。

6、Consumer 重试

Consumer 消费失败,这里有 3 种情况:

返回 RECONSUME_LATER 返回 null 抛出异常

Broker 收到这个响应后,会把这条消息放入重试队列,重新发送给 Consumer。

注意:

Broker 默认最多重试 16 次,如果重试 16 次都失败,就把这条消息放入死信队列,Consumer 可以订阅死信队列进行消费。 重试只有在集群模式(MessageModel.CLUSTERING)下生效,在广播模式(MessageModel.BROADCASTING)下是不生效的。 Consumer 端一定要做好幂等处理。

其实重试 3 次都失败就可以说明代码有问题,这时 Consumer 可以把消息存入本地,给 Broker 返回CONSUME_SUCCESS 来结束重试。代码如下:

int count = ((MessageExt) msgs).getReconsumeTimes();

if (count > 2) {

    //TODO 把消息写入本地存储

    System.out.println("重试次数超过3次");

    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

7、事务消息

RocketMQ支持事务消息,整体流程如下图:

Producer 发送 half 消息; Broker 先把消息写入 topic 是 RMQ_SYS_TRANS_HALF_TOPIC 的队列,之后给 Producer 返回成功; Producer 执行本地事务,成功后给 Broker 发送 commit 命令(本地事务执行失败则发送 rollback); Broker 收到 commit 请求后把消息状态更改为成功并把消息推到真正的 topic; Consumer 拉取消息进行消费。

代码如下:

public class ProducerTransactionListenerImpl implements TransactionListener {


    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        /**
         * 这里执行本地事务,执行成功返回LocalTransactionState.COMMIT_MESSAGE,执行失败返回
         * LocalTransactionState.ROLLBACK_MESSAGE,如果返回LocalTransactionState.UNKNOW,
         * Broker会回来查询,所以需要记录事务执行状态
         */
        return LocalTransactionState.COMMIT_MESSAGE;
    }


    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        /**
         * 这里查询事务执行状态,根据事务状态返回LocalTransactionState.COMMIT_MESSAGE或
         * LocalTransactionState.ROLLBACK_MESSAGE,如果没有查询到返回LocalTransactionState.UNKNOW,
         * Broker会再次查询,可以记录查询次数,超过次数后返回ROLLBACK_MESSAGE
         */
        return LocalTransactionState.UNKNOW;
    }
}

8、消息索引

我们知道,RocketMQ 核心的数据文件有 3 个:CommitLog、ConsumeQueue 和 Index。其中Index 文件就是一个索引文件,结构如下图:

查找消息时,首先根据消息 key 的 hashcode 计算出 Hash 槽的位置,然后读取 Hash 槽的值计算 Index 条目的位置,从Index 条目位置读取到消息在 CommitLog 文件中的 offset,从而查找到消息。

在 Producer 发送消息时,可以指定一个 key,代码如下:

Message sendMessage = new Message("topic1", "tag1", message.getBytes());

sendMessage.setKeys("weiyiid");

这样可以通过 RocketMQ 提供的命令或者管理控制台来查询消息是否发送成功。

9、极端情况

如果对消息丢失零容忍,我们必须要考虑极端情况,比如整个 RocketMQ 集群挂了,这时 Producer 端发送消息一定会失败,可以考虑在 Producer 端做降级,把要发送的消息保存到本地数据库或磁盘,等 RocketMQ 恢复以后再把本地消息推送出去。

三、总结

在一些特殊的业务场景,比如支付、银行核算等,需要确保消息不丢失,但是同时也要看到,消息不丢失的方案会大大降低 RocketMQ 的吞吐量,需要综合考虑。

3

站心网

引入消息队列可以方便地实现系统解耦、削峰填谷等作用。但是消息队列使用不当,可能会引起消息丢失,在一些..

为您推荐

写给那些想要自学成才的java程序员

自学java没那么难一:个人经历我的大学:第一年泡在图书馆看杂七杂八的书,跟学习毫无关系。第二年疯狂打LOL,从白银打到黄铜(黄铜守门员)。第三年上半年,被某人点醒,学习了整套C#知识体系,某马。下半年又决定..

跳槽!Java面试经验总结

0.前言笔者在不足两年经验的时候从成都一家金融科技中厂跳槽到杭州阿里淘天集团,又于今年5月份从杭州淘天跳槽到成都字节。自认为自己在面试这方面有一点心得,处于记录和分享的目的便有了此文,此文纯主观,也许对3..

Java中String类常见的方法

以下介绍字符串常见的几个方法。介绍String类在 Java 中,String类是一个代表字符串的类,具有以下特性:不可变性:String对象一旦被创建就是不可变的,即它们的值在创建后不能被更改。任何对String对象的修改操作实..

使用SuperWebSocket实现Web消息推送

在大部分Web系统中,我们可能遇到需要向客户端推送消息的需求。SuperWebSocket第三方库能让我们轻松的完成任务。SuperWebSocket第三方库可以从网上下载,不过通过Visual Studio Nuget安装更快。引用SuperWebSocket相..

ASP.NET如何将Views文件夹从项目分离

将 Views 文件夹从 ASP.NET 项目中分离是一个常见需求,比如为了实现模块化或分层架构。以下是实现此功能的完整步骤,从项目中分离 Views 文件夹,将其移到另一个独立的文件夹或项目中,并确保视图渲染仍然正常。1. ..

如何让AI写出Google认可的文章?

让 AI 写出 Google 认可的文章,核心在于满足 Google 的搜索排名算法要求。这些要求通常围绕以下几个关键点展开:内容质量、用户体验 和 SEO 优化。以下是详细指导:一、内容质量确保文章原创且有价值避免抄袭:Goog..

ASP.NET 中的 Session 丢失或无法保持状态

在 ASP.NET 中,如果遇到 Session 丢失 或 无法保持状态 的问题,通常是由于配置错误、服务器设置不当或跨服务器部署的 Session 状态管理问题。以下是一些常见原因和解决方法:1. Session 超时ASP.NET 默认的 Sessio..

如何从.NET Framework迁移到.NET Core或.NET 6/7?

从 .NET Framework 迁移到 .NET Core 或 .NET 6/7 是一个提升性能和跨平台能力的关键过程。以下是迁移的主要步骤和注意事项:迁移步骤1. 评估当前项目依赖项检查:确保所有第三方库和NuGet包都有与 .NET Core/.NET 6..

如何优化ASP.NET Core应用的性能?

优化ASP.NET Core应用性能需要从代码、数据库、配置、服务器和部署等多个层面进行综合考虑。以下是一些优化ASP.NET Core应用性能的关键方法和技巧:1. 代码级优化使用异步编程:避免阻塞线程,通过async和await处理I..

chrome 开发者工具如何查看元素:hover时的样式

在 Chrome 开发者工具中查看元素的 :hover 样式,可以通过以下步骤实现:打开开发者工具:右键点击页面中的元素,然后选择“检查(Inspect)”或按下 F12 或 Ctrl + Shift + I (Windows) / Cmd + Option +..

简单优雅的Java ORM

Java的ORM框架有很多,但由于Java语言的限制大部分都不够优雅也不够简单,所以作者只能另辟蹊径造轮子了。照旧先看示例代码了解个大概,然后再解释实现原理。一、ORM示例Insertpublic CompletableFuture<Void> inser..

轻量级 JavaScript 动画库 mo.js使用教程

mo.js 是一个强大的 JavaScript 动画库,专为在网页项目中创建复杂动画和运动图形而设计。它注重提供平滑、动态的动画效果,并通过简单、模块化和灵活的组件让开发更加便捷。mo.js官网地址:https://mojs.github.io/..

文件上传JavaScript库FilePond使用教程

传统的文件上传控件往往显得笨拙且不够用户友好。FilePond的出现,为Web文件上传带来了革命性的改变。本文将详细介绍FilePond这一JavaScript库,探讨它如何优化文件上传流程,并提供无与伦比的用户体验。什么是FileP..

Swapy - 开源JavaScript js拖拽插件

Swapy是一个简单易用的JavaScript工具,能够将任何布局转换为拖拽交换布局。本文将详细介绍Swapy的功能、如何使用它,以及它在实际项目中的应用。什么是Swapy?Swapy是由TahaSh开发的一款开源JavaScript工具。它的核..

JavaScript 的 sessionStorage 能否加锁?

直接给 sessionStorage 加锁是不可能的。sessionStorage 的本质: sessionStorage 是浏览器提供的一种用于在当前浏览器会话中存储数据的机制。它存储在客户端,数据仅在当前浏览器窗口或标签页中有效。加锁的必要性..

如何防止web应用DOS攻击?

防止web应用DOS攻击的最好的方法是什么? 如何防止web应用DOS攻击? 与所有的拒绝服务(DOS)攻击相关的一件事是他们都不可能避免。最好的方法是把重点放在减少影响DOS攻击的方法上。如果你有一个网络,黑客想要玩一玩..

数据库SQL Server2014和SQL Server2019的区别和如何选择?

SQL Server 2014和SQL Server 2019是微软公司发布的两个版本的数据库管理系统,它们在性能、安全性以及可扩展性等方面各有特点。在选择这两个数据库版本时,需要根据系统需求、预算状况以及技术团队的熟悉程度等因素..

MySQL如何建数据库

MySQL是一款非常流行的关系型数据库管理系统。无论是在企业还是个人项目中,都经常使用MySQL数据库。在使用MySQL之前,需要先创建一个数据库。本文将介绍如何建立MySQL数据库。一、安装MySQL在开始建立MySQL数据库之..

c#实现与Java无差异的GZip压缩和GZip解压缩

c#实现与Java无差异的GZip压缩和GZip解压缩,其中有个坑就是GZip压缩的时候,只有在GZipStream在Dispose后调应对应MemoryStream.ToArray()所得到的结果才是正确的压缩数据。如果在zipStream.Write(bytes, 0, bytes.L..

jwt是什么?.NET Core API如何使用JwtBearer验证

JWT是JSON Web Token的缩写,是一种开放标准(RFC 7519),用于在网络上以安全和可靠的方式传输信息。它是一种被广泛使用的跨域身份验证解决方案,可以将用户信息、访问权限等加密后存储在Token中,然后通过网络传输..

发表回复

返回顶部