首页 程序笔记 Windows下RabbitMQ安装和使用EasyNetQ组件操作

Windows下RabbitMQ安装和使用EasyNetQ组件操作

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现,是实现消息队列应用的一个中间件,消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。EasyNetQ则是基于官方.NET组件RabbitMQ.Client 的又一层封装,使用起来更加方便。

1、RabbitMQ基础知识

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

**RabbitMQ **是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

RabbitMQ的特点:强大的应用程序消息传递;使用方便;运行在所有主要操作系统上;支持大量开发人员平台;开源和商业支持。消息队列的模式有两种模式:P2P(Point to Point),P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。Publish/Subscribe(Pub/Sub),包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

EasyNetQ的目标是提供一个使.NET中的RabbitMQ尽可能简单的库。在EasyNetQ中消息应由.NET类型表示,消息应通过其.NET类型进行路由。EasyNetQ按消息类型进行路由。发布消息时,EasyNetQ会检查其类型,并根据类型名称,命名空间和装配体给出一个路由密钥。在消费方面,用户订阅类型。订阅类型后,该类型的消息将路由到订户。默认情况下,EasyNetQ使用Newtonsoft.Json库将.NET类型序列化为JSON。这具有消息是人类可读的优点,因此您可以使用RabbitMQ管理应用程序等工具来调试消息问题。

EasyNetQ是在RabbitMQ.Client库之上提供服务的组件集合。这些操作可以像序列化,错误处理,线程编组,连接管理等。它们由mini-IoC容器组成。您可以轻松地用自己的实现替换任何组件。因此,如果您希望XML序列化而不是内置的JSON,只需编写一个ISerializer的实现并将其注册到容器。以下是官方提供的一个结构图,这个结构图可以很好的解析该组件的结构:

2、RabbitMQ的环境准备

本处主要介绍在Windows系统中安装RabbitMQ。

1. 下载安装erlang

下载地址 http://www.erlang.org/downloads(根据操作系统选择32还64位)

2. 下载安装rabbitmq-server

下载地址 http://www.rabbitmq.com/install-windows.html

下载后获得两个安装文件,按照顺序安装即可

安装erlang环境后,一般会添加了一个ERLANG_HOME的系统变量,指向erlang的安装目录路径,如下所示(一般都自动添加好了,自己可以确认一下)

安装RabbitMQ后,在程序里面可以看到

我们使用它的命令行来启动RabbitMQ的服务

查看安装是否成功命令 :rabbitmqctl status

安装成功,在浏览器中输入 http://127.0.0.1:15672/,可以看到如下界面,使用默认的账号密码均为guest登陆进行管理

guest 账号是管理员账号,可以添加Exchanges,Queues,Admin。但我们一般不使用guest账号,可以选择用命令来添加账号和权限,也可以使用管理界面进行添加相应的内容。

例如我添加相应的用户账号

一般我们还需要添加虚拟机,默认的虚拟机为/,我这里添加了一个虚拟机myvhost。

然后绑定账号到虚拟机上即可。

3、EasyNetQ组件的使用

EasyNetQ组件的使用方式比较简单,跟很多组件都类似,例如:建立连接,进行操作做等等,对于EasyNetQ组件也是如此。

在.NET中使用EasyNetQ,要求至少基于 .NET4.5的框架基础上进行开发,可以直接在VS项目上使用NuGet的程序包进行添加EasyNetQ的引用。

一般添加引用后,至少包含了下面图示的几个引用DLL。

1)创建连接:

使用EasyNetQ连接RabbitMQ,是在应用程序启动时创建一个IBus对象,并且,在应用程序关闭时释放该对象。

RabbitMQ连接是基于IBus接口的,当IBus中的方法被调用,连接才会开启。创建一个IBus对象的方法如下:

var bus = RabbitHutch.CreateBus(“host=myServer;virtualHost=myVirtualHost;username=admin;password=123456”);

与RabbitMQ服务器的延迟连接由IBus接口表示,创建连接的方式连接字符串由格式为key = value的键/值对组成,每一个用分号(;)分隔。

host,host=localhost 或者host =192.168.1.102或者host=my.rabbitmq.com,如果用到集群配置的话,那么可以用逗号将服务地址隔开,例如host=a.com,b.com,c.com

virtualHost,虚拟主机,默认为'/'

username,用户登录名

password,用户登录密码

requestedHeartbeat,心跳设置,默认是10秒

prefetchcount,默认是50

pubisherConfirms,默认为false

persistentMessages,消息持久化,默认为true

product,产品名

platform,平台

timeout,默认为10秒

一般我们在代码里面测试的话,简化连接代码如下所示。

 //初始化bus对象
 bus = RabbitHutch.CreateBus("host=localhost");

2)关闭连接:

bus.Dispose();

要关闭连接,只需简单地处理总线,这将关闭EasyNetQ使用的连接,渠道,消费者和所有其他资源。

如果我们在Winform窗体里面初始化一个IBus对象,那么在窗体关闭的时候,关闭这个接口即可。

private void FrmPublisher_FormClosed(object sender, FormClosedEventArgs e)
{
    //关闭IBus接口
    if(bus != null)
    {
        bus.Dispose();
    }
}

3)发布消息:

EasyNetQ支持最简单的消息模式是发布和订阅。发布消息后,任意消费者可以订阅该消息,也可以多个消费者订阅。并且不需要额外配置。首先,如上文中需要先创建一个IBus对象,然后,在创建一个可序列化的.NET对象。调用Publish方法即可。

var message = new MyMessage { Text = "Hello Rabbit" };
bus.Publish(message);

4)订阅消息:

EasyNetQ提供了消息订阅,当调用Subscribe方法时候,EasyNetQ会创建一个用于接收消息的队列,不过与消息发布不同的是,消息订阅增加了一个参数,subscribe_id.代码如下:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

第一个参数是订阅id,另外一个是delegate参数,用于处理接收到的消息。这里要注意的是,subscribe_id参数很重要,假如开发者用同一个subscribeid订阅了同一种消息类型两次或者多次,RabbitMQ会以轮流地给每个订阅的队列发送消息。如果用不同的subscribeid订阅同一种消息类型,那么生成的每一个队列都会收到该消息。

需要注意的是,在收到消息处理消息时候,不要占用太多的时间,会影响消息的处理效率,所以,遇到占用长时间的处理方法,最好用异步处理。

为了测试发布和订阅消息,我们可以建立几个不同的项目来进行测试,如发布放在一个Winform项目,订阅放在一个Winform项目,另外一个项目放置共享的消息对象定义,如下所示。

定义消息对象类代码如下

/// <summary>
/// 定义的MQ消息类型
/// </summary>
public class TextMessage
{
    public string Text { get; set; }
}

然后在发布消息的Winform项目上创建一个处理的窗体,并添加如下代码。

namespace MyRabbitMQ.Publisher{
/// <summary>
/// 测试RabbitMQ消息队列的发布
/// </summary>
public partial class FrmPublisher : DevExpress.XtraEditors.XtraForm    {
    //构建一个IBus公用接口对象
    private IBus bus = null;
    public FrmPublisher()
    {
        InitializeComponent();
        //初始化bus对象
        bus = RabbitHutch.CreateBus("host=localhost");
        //对指定消息类型进行回应
        bus.Respond<MyRequest, MyResponse>(request => new MyResponse { Text = "Responding to: "+ request.Text});
        //收到消息后输出到控制台上显示
        bus.Receive("my.queue", x => x            .Add<MyMessage>(message => Console.WriteLine(message.ToJson()))
        .Add<MyOtherMessage>(message => Console.WriteLine(message.ToJson())));
    }
}

发布消息的处理代码,如下代码所示

private void btnSend_Click(object sender, EventArgs e)
{
    if (bus != null)
    {
        bus.Publish(new TextMessage
        {
            Text = this.txtContent.Text
        });
    }
}

然后在创建一个类似窗体,用来订阅消息的处理窗体,如下所示代码和窗体。

namespace MyRabbitMQ.Subcriber{   
    /// <summary>
    /// 测试RabbitMQ消息队列的订阅
    /// </summary>
    public partial class FrmSubcriber : DevExpress.XtraEditors.XtraForm    {
        //构建一个IBus公用接口对象
        private IBus bus = null;

        public FrmSubcriber()
        {
            InitializeComponent();

            //初始化bus对象
            bus = RabbitHutch.CreateBus("host=localhost");
            if(bus != null)
            {
                //订阅一个消息,并对接收到的消息进行处理,展示在控件上
                bus.Subscribe<TextMessage>("test", (msg) =>                {
                    StringBuilder sb = new StringBuilder();
                    sb.AppendLine(msg.Text + "," + DateTime.Now.ToString());
                    sb.AppendLine(this.txtContent.Text);

                    this.txtContent.Invoke(new MethodInvoker(delegate()
                    {
                        this.txtContent.Text = sb.ToString();
                    }));
                });
            }

            //使用消息发送接口发送消息
            bus.Send("my.queue", new MyMessage { Text = "Hello Widgets!" });
            bus.Send("my.queue", new MyOtherMessage { Text = "Hello wuhuacong!" });
        }

发送请求获取响应的代码如下所示。

        private void btnRequest_Click(object sender, EventArgs e)
        {
            //定义请求消息的对象
            var request = new MyRequest()
            {
                Text = string.Format("请求消息,{0}", DateTime.Now)
            };

            //异步获取请求消息的结果并进行处理,展示应答消息在窗体中的
            var task = bus.RequestAsync<MyRequest, MyResponse>(request);
            task.ContinueWith(response =>
            {
                StringBuilder sb = new StringBuilder();
                sb.AppendLine(response.Result.Text);
                sb.AppendLine(this.txtContent.Text);
                this.txtContent.Invoke(new MethodInvoker(delegate()
                {
                    this.txtContent.Text = sb.ToString();
                }));
            });
        }

两个项目联合进行测试如下界面所示。

发布者多次发送消息的情况下,订阅者中,会进行消息的轮询处理,也就是进行均匀分配。

5)消息发送(Send)和接收(Receive)

与Publish/Subscribe略有不同的是,Send/Receive 可以自己定义队列名称。

//发送端代码
bus.Send("my.queue", new MyMessage{ Text = "Hello Widgets!" });
//接收端代码
bus.Receive<MyMessage>("my.queue", message => Console.WriteLine("MyMessage: {0}", message.Text));

并且,也可以在同一个队列上发送不同的消息类型,Receive方法可以这么写:

bus.Receive("my.queue", x => x
    .Add<MyMessage>(message => deliveredMyMessage = message)
    .Add<MyOtherMessage>(message => deliveredMyOtherMessage = message));

如果消息到达队列,但是没有发现相应消息类型的处理时,EasyNetQ会发送一条消息到error队列,并且,带上一个异常信息:No handler found for message type <message type>。与Subscribe一样,如果在同一个队列,同一个消息类型,多次调用Receive方法时,消息会轮流发送给每个Receive端。

6)远程过程调用:

var request = new TestRequestMessage {Text = "Hello from the client! "};bus.Request<TestRequestMessage, TestResponseMessage>(request, response => 
    Console.WriteLine("Got response: '{0}'", response.Text));

7)RPC服务器:

bus.Respond<TestRequestMessage, TestResponseMessage>(request => 
    new TestResponseMessage{ Text = request.Text + " all done!" });

8)记录器:

var logger = new MyLogger() ;var bus = RabbitHutch.CreateBus(“my connection string”, x => x.Register<IEasyNetQLogger>(_ => logger));

9)路由:

Publish方法,可以加一个topic参数。

bus.Publish(message, "X.A");

消息订阅方可以通过路由来过滤相应的消息。

* 匹配一个字符

#匹配0个或者多个字符

所以 X.A.2 会匹配到 "#", "X.#", ".A." 但不会匹配 "X.B.*" 或者 "A". 当消息订阅需要用到topic时候,需要调用Subscribe的重载方法

bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*"));
bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));

上述这种方式,会将消息轮询发送给两个订阅者,如果只需要一个订阅者的话,可以这么调用:

bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));

RabbitMQ具有非常好的功能,基于主题的路由,允许订阅者基于多个标准过滤消息。*(星号)匹配一个字。#(哈希)匹配为零个或多个单词。

RabbitMQ的应用场景,一般在快速处理订单,以及异步的多任务处理中可以得到很好的体现,下面是几个应用场景。

邮件和短消息的处理

订单的解耦处理

RabbitMQ的服务器架构

3、RabbitMQ查询状态出现错误的处理

安装成功之后使用rabbitmqctl status命令之后出现如下错误。

Status of node rabbit@WUHUACONG ...
Error: unable to perform an operation on node 'rabbit@WUHUACONG'. Please see diagnostics information and suggestions below.

Most common reasons for this are:

 * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)
 * CLI tool fails to authenticate with the server (e.g. due to CLI tool's Erlang cookie not matching that of the server)
 * Target node is not running

In addition to the diagnostics info below:

 * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more
 * Consult server logs on node rabbit@WUHUACONG

DIAGNOSTICS
===========

attempted to contact: [rabbit@WUHUACONG]

rabbit@WUHUACONG:
  * connected to epmd (port 4369) on WUHUACONG
  * epmd reports node 'rabbit' uses port 25672 for inter-node and CLI tool traffic
  * TCP connection succeeded but Erlang distribution failed

  * Authentication failed (rejected by the remote node), please check the Erlang cookie


Current node details:
 * node name: rabbitmqcli100@WUHUACONG
 * effective user's home directory: C:\Users\Administrator
 * Erlang cookie hash: RgaUM2coc+rxIhJrfLS7Jw==

这个问题出现比较常见,主要原因是两个目录的.erlang.cookie文件内容不一样。


要确保.erlang.cookie文件的一致性,不知道什么原因导致了C:\Users{UserName}.erlang.cookie和默认情况下C:\WINDOWS\System32\config\systemprofile.erlang.cookie不一致了,将Windows目录下的拷贝到用户目录下就可以了。

反正无论如何,两个地址的Cookie内容一致就可以了,然后重启下RabbitMQ服务器即可正常运行,并可以正常获取它的状态信息。


5

站心网

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现,是实现消息队列应用的一个中间件,..

为您推荐

5 个顶级的 JavaScript Ajax 组件和库

在这篇文章中,我们将介绍一些用于AJAX调用的最好的JS库,包括jQuery,Axios和Fetch。欢迎查看代码示例!AJAX是用来对服务器进行异步HTTP调用的一系列web开发技术客户端框架。 AJAX即Asynchronous JavaScript and XM..

Mysql查询的一些操作(查表名,查字段名,查当月,查一周,查当天)

查询数据库中所有表名select table_name from information_schema.tables where table_schema='tools' and table_type='base table';查询指定数据库中指定表的所有字段名column_nameselect column_n..

使用 html2canvas 实现截图功能

html2canvas 是一个开源的 JavaScript 库,用于将网页上的 HTML 元素渲染成图像。它通过遍历页面的 DOM 树和计算样式,然后将其绘制到 <canvas> 元素上,最终生成图片。该库不依赖服务器端,而是通过浏览器端的 Java..

使用SuperWebSocket实现Web消息推送

在大部分Web系统中,我们可能遇到需要向客户端推送消息的需求。SuperWebSocket第三方库能让我们轻松的完成任务。SuperWebSocket第三方库可以从网上下载,不过通过Visual Studio Nuget安装更快。引用SuperWebSocket相..

.NET C# 使用Hook钩子实现全局监听键盘和鼠标

C# 是一种面向对象的编程语言,具有丰富的类库和工具支持,适用于各种类型的应用程序开发。Windows 提供了一种称为"钩子"(Hook)的机制,允许拦截并处理系统级别的事件,如键盘按键和鼠标移动。通过结合 C# 和 Hook..

C#使用 Attribute 实现 AOP 功能

在 C# 中,通过自定义 Attribute 并结合一些技术(如动态代理、反射等)可以实现 AOP(面向切面编程)。AOP 通常用于日志记录、性能监控、权限验证等横切关注点。以下是一个使用 C# Attribute 实现 AOP 功能的示例。..

ABP.Net Core使用教程(一)启动模版项目

只需要简单的3步:1,到官网下载模版项目 https://aspnetboilerplate.com/Templates2,用VS2017打开,将Web.Host设置为启动项3,在程序包管理器控制台(Nuget控制台)里设定默认项目为EntityFrameworkCore,执行命令..

C#中的线程安全的集合ConcurrentQueue使用示例

在多线程编程中,如何安全地在不同线程之间共享数据是一个非常重要的问题。C# 为我们提供了一些专门设计的线程安全集合,其中之一就是 ConcurrentQueue<T>。它是一种先进先出(FIFO)的数据结构,专门为多线程环境设..

CSS砌体布局示例和使用场景

CSS砌体布局(Masonry Layout)CSS砌体布局是一种网页布局技术,它的灵感来源于砖石墙的排列方式,类似于“拼图”或“拼砖”的效果。在砌体布局中,元素的排列并不完全遵循传统的网格布局规则,..

使用CSS columns-visibility实现砌体布局

CSS的 columns 属性(如 columns、column-count 和 column-width)通常用于多列文本布局,而不是直接用于砌体布局。然而,结合 columns 和 visibility 属性,可以在某些情况下实现类似砌体布局的效果,虽然它并不完..

使用System.Linq.Dynamic.Core扩展库动态构建 LINQ 查询

System.Linq.Dynamic.Core 是一个扩展库,用于在运行时动态构建 LINQ 查询,支持字符串形式的表达式解析和动态查询操作。它是 .NET 的一个强大工具,适合处理需要灵活定义查询逻辑的场景,例如动态过滤、排序、投影..

小米开源智能家居平台 ha_xiaomi_home 使用示例

小米近期在 GitHub 上开源了名为“ha_xiaomi_home”的项目,即 Home Assistant 米家集成组件。该组件由小米官方支持,旨在让用户在 Home Assistant 中集成和控制小米 IoT 智能设备。主要特点:官方支持:..

C#13新特性 使用System.Threading.Lock简化线程同步

C# 13 引入了新的线程同步类型 System.Threading.Lock,它通过作用域管理的方式简化了锁的使用,使代码更加清晰可靠。本文将全面介绍 System.Threading.Lock 的功能、适用场景,并提供完整的运行示例程序。1. 什么是..

微软官方Microsoft.Extensions.AI库使用示例

Microsoft.Extensions.AI 库介绍Microsoft.Extensions.AI 是一个扩展库,用于在 .NET 应用程序中轻松集成人工智能(AI)服务,例如 OpenAI、Azure OpenAI 和其他支持文本生成或语言模型的 API。通过与 Microsoft.Ext..

分享5个开源的.NET Excel读写操作库

本文给大家分享 5 个开源的 .NET Excel 读写操作库,它们广泛用于处理 Excel 文件,包括读取、写入、导入和导出数据。1. EPPlus简介:EPPlus 是功能强大的 .NET 库,用于创建和读取 Excel 文件(.xlsx 和 .xlsm 格式..

.Net Core中Dapper的使用详解

1.安装Dapper这里直接使用Nuget安装。安装版本是1.50.5安装完成之后,发现Nuget下已经有了Dapper。2.创建DapperHelper接下来创建一个DapperHelper帮助类,来进行读取数据库连接字符串,打开数据库等操作。public cla..

JavaScript中字典的常用操作

字典是一种以键值对存在的数据结构,他的底层是Array数组字典初始化和数组初始化的区别:数组的初始化:var arr = [1,2,3,4,5];//使用中括号字典的初始化: var names = {“a”:“aaa”,“b”:“bbb”,“c”:“ccc”}..

最新CentOS7安装搭建shadowsocks服务端+客户端使用图文教程

使用的CentOS版本是7.9,其他版本也可以。超级推荐的是搭建shadowsocks服务端,安装配置都很简单,几分钟就搞定,客户端支持PC移动端,下面是安装shadowsocks的过程,只要复制粘贴命令就行了,文件夹路径都不需要改..

ASP.NET 使用Entity Framework (EF) 创建迁移修改SQLite数据库表结构

在 ASP.NET 中,使用 Entity Framework (EF) 创建并连接 SQLite 数据库是一种轻量级、高效的数据库管理方式。以下是详细步骤:安装必要的 NuGet 包安装EntityFrameworkCore.Sqlite包:Install-Package Microsoft.Ent..

使用shields.io来实时显示GitHub项目star、watch和fork的数量

如何获取GitHub repo实时的star,watch和fork数量呢?这里推荐一个Shields.io工具,可以实时生成GitHub徽章,同时显示star数。显示效果如下:什么是 Shields.io?Shields.io 是一个开源项目,用于生成各种类型的徽章..

发表回复

返回顶部