NetCore基于EasyNetQ的⾼级API使⽤RabbitMq
⼀、消息队列
消息队列作为分布式系统中的重要组件,常⽤的有MSMQ,RabbitMq,Kafa,ActiveMQ,RocketMQ。⾄于各种消息队列的优缺点⽐较,在这⾥就不做扩展了,⽹上资源很多。
更多内容可参考。我在这⾥选⽤的是RabbitMq。
安装和配置:
⼆、RabbitMq简单介绍
RabbitMQ是⼀款基于AMQP(⾼级消息队列协议),由Erlang开发的开源消息队列组件。是⼀款优秀的消息队列组件,他由两部分组成:服务端和客户端,客户端⽀持多种语⾔的驱动,如:.Net、JAVA、  Erlang等。在RabbitMq中⾸先要弄清楚的概念是交换机、队列、绑定。基本的消息通讯步骤就是⾸先定义ExChange,然后定义队列,然后绑定交换机和队列。
需要明确的⼀点⼉是,发布者在发送消息是,并不是把消息直接发送到队列中,⽽是发送到Exchang,然后由交互机根据定义的消息匹配规则,在将消息发送到队列中。
Exchange有四种消息消息分发规则:direct,topic,fanout,header。headers 匹配 AMQP 消息的 header ⽽不是路由键,此外 headers 交换器和 direct 交换器完全⼀致,但性能差很多,⽬前⼏乎⽤不到了。
详细的概念介绍推荐查看:
三、EasyNetQ使⽤
Easynetq是⼀个简单易⽤的Rabbitmq Net客户端。同时⽀持 NetFramework和NetCore。。它是针对RabbitMq Net客户端的进⼀步封装。关于EasyNetQ 的简单使⽤推荐教程:。
本⽂主要介绍基于EasyNeq的⾼级API的使⽤。EasyNetQ的作者在核⼼的IBus接⼝中尽量避免暴露AMQP中的交换机、队列、绑定这些概念,使⽤者即使不去了解这些概念,也能完成消息的发送接收。这相当简洁,但某些情况下,基于应⽤场景的需要,我们需要⾃定义交换机、队列、绑定这些信
息,EasyNetQ允许你这么做,这些都是通过IAdvanceBus接⼝实现。
3.1 项⽬装备
这⾥为了演⽰,⾸先新建⼀个项⽬,包括⼀个发布者,两个接收者,⼀个公共的类库
安装EasyNetQ: NuGet>Install-Package EasyNetQ
3.2 简单封装
在Common项⽬⾥⾯是针对Easynetq的使⽤封装,主要⽬录如下
在RabbitMq⽂件夹下,是针对消息发送接收的简单封装。
⾸先来看下RabbitMqManage,主要的发送和订阅操作都在这个类中。其中ISend接⼝定义了发送消息的规范,SendMessageManage是ISend的实现。IMessageConsume接⼝定义订阅规范。
MesArg 和PushMsg分别是订阅和发送需⽤到的参数类。RabbitMQManage是暴露在外的操作类。
⾸先看发送的代码
public enum SendEnum
{
订阅模式 = 1,
推送模式 = 2,
主题路由模式 = 3
}
public class PushMsg
{
///<summary>
///发送的数据
///</summary>
public object sendMsg { get; set; }
///<summary>
///消息推送的模式
///现在⽀持:订阅模式,推送模式,主题路由模式
///</summary>
public SendEnum sendEnum { get; set; }
///<summary>
///管道名称
/
//</summary>
public string exchangeName { get; set; }
///<summary>
///路由名称
///</summary>
public string routeName { get; set; }
}
internal interface ISend
{
Task SendMsgAsync(PushMsg pushMsg, IBus bus);
void  SendMsg(PushMsg pushMsg, IBus bus);
}
internal class SendMessageMange : ISend
{
public async Task SendMsgAsync(PushMsg pushMsg, IBus bus)
{
//⼀对⼀推送
var message = new Message<object>(pushMsg.sendMsg);
IExchange ex = null;
//判断推送模式
if (pushMsg.sendEnum == SendEnum.推送模式)
{
ex = bus.Advanced.hangeName, ExchangeType.Direct);
}
if (pushMsg.sendEnum == SendEnum.订阅模式)
{
//⼴播订阅模式
ex = bus.Advanced.hangeName, ExchangeType.Fanout);            }
if (pushMsg.sendEnum == SendEnum.主题路由模式)
{
//主题路由模式
ex = bus.Advanced.hangeName, ExchangeType.Topic);
}
await bus.Advanced.PublishAsync(ex, uteName.ToSafeString(""), false, message)            .ContinueWith(task =>
{
if (!task.IsCompleted && task.IsFaulted)//消息投递失败
{
//记录投递失败的消息信息
}
});
}
public void SendMsg(PushMsg pushMsg, IBus bus)
{
//⼀对⼀推送
var message = new Message<object>(pushMsg.sendMsg);
IExchange ex = null;
//判断推送模式
if (pushMsg.sendEnum == SendEnum.推送模式)
{
ex = bus.Advanced.hangeName, ExchangeType.Direct);                }
if (pushMsg.sendEnum == SendEnum.订阅模式)
{
//⼴播订阅模式
ex = bus.Advanced.hangeName, ExchangeType.Fanout);                }
if (pushMsg.sendEnum == SendEnum.主题路由模式)
{
//主题路由模式
ex = bus.Advanced.hangeName, ExchangeType.Topic);
}
bus.Advanced.Publish(ex, uteName.ToSafeString(""), false, message);
}
}
在EasyNetQ中对于异步发送消息的时候,消息是否送达Broker只需要查看异步发送⽅法最终执⾏成功还是失败,成功就表⽰消息送达,如果失败可以将失败后的消息存⼊数据库中,然后⽤后台线程轮询
数据库表,将失败后的消息进⾏重新发送。这种⽅式还可以进⼀步变成消息表,就是先将要发送的消息存⼊消息表中,然后后台线程轮询消息表来进⾏消息发送。⼀般这种⽅式被⼴泛⽤于分布式事务中,
将本地数据库操作和消息表写⼊放⼊同⼀个本地事务中,来保证消息发送和本地数据操作的同步成功,
因为我的系统中,分布式事务的涉及很少,所以就没这样去做,只是简单的在异步发送的时候监控下
是否发送失败,然后针对失败的消息做⼀个重新发送的机制。这⾥,推荐⼤佬的NetCore分布式事务解决⽅案 CAP 。
接着看⼀下消息订阅接收涉及的代码
public class MesArgs
{
///<summary>
///消息推送的模式
///现在⽀持:订阅模式,推送模式,主题路由模式
///</summary>
public SendEnum sendEnum { get; set; }
///<summary>
///管道名称
///</summary>
public string exchangeName { get; set; }
///<summary>
///对列名称
///</summary>
public string rabbitQueeName { get; set; }
///<summary>
///路由名称
///</summary>
public string routeName { get; set; }
}
public interface IMessageConsume
{
void Consume(string message);
}
在订阅中我定义了⼀个接⼝,最终业务代码中,所有的消息订阅类,都需要继续此接⼝
最后,我们来看下对外使⽤的操作类
public class RabbitMQManage
{
private volatile static IBus bus = null;
private static readonly object lockHelper = new object();
/
//<summary>
///创建服务总线
///</summary>
///<param name="config"></param>
///<returns></returns>
public static IBus CreateEventBus()
{
//获取RabbitMq的连接地址
//SystemJsonConfigManage 是我简单封装的⼀个json操作类,⽤于针对json⽂件的读写操作
var config = SystemJsonConfigManage.GetInstance().AppSettings["MeessageService"];
if (string.IsNullOrEmpty(config))
throw new Exception("消息地址未配置");
if (bus == null && !string.IsNullOrEmpty(config))
{
lock (lockHelper)
{
if (bus == null)
bus = RabbitHutch.CreateBus(config);
}
}
return bus;
}
/
//<summary>
///释放服务总线
///</summary>
public static void DisposeBus()
{
bus?.Dispose();
}
///<summary>
///消息同步投递
///</summary>
///<param name="pushMsg"></param>
/
//<returns></returns>
public static bool PushMessage(PushMsg pushMsg)
{
bool b = true;
try
{
if (bus == null)
CreateEventBus();
new SendMessageMange().SendMsg(pushMsg, bus);
b = true;
}
catch (Exception ex)
{
b = false;
}
return b;
}
///<summary>
///消息异步投递
///</summary>
///<param name="pushMsg"></param>
public static async Task PushMessageAsync(PushMsg pushMsg)
{
try
{
if (bus == null)
CreateEventBus();
await new SendMessageMange().SendMsgAsync(pushMsg, bus);
}
catch (Exception ex)
{
throw ex;
}
}
///<summary>
///消息订阅
///</summary>
public static void Subscribe<TConsum>(MesArgs args)
where TConsum : IMessageConsume,new()
{
if (bus == null)
CreateEventBus();
if (string.hangeName))
return;
Expression<Action<TConsum>> methodCall;
IExchange ex = null;
//判断推送模式
if (args.sendEnum == SendEnum.推送模式)
{
ex = bus.Advanced.hangeName, ExchangeType.Direct);            }
if (args.sendEnum == SendEnum.订阅模式)
{
//⼴播订阅模式
ex = bus.Advanced.hangeName, ExchangeType.Fanout);            }
if (args.sendEnum == SendEnum.主题路由模式)
{
//主题路由模式
ex = bus.Advanced.hangeName, ExchangeType.Topic);            }
IQueue qu;
if (string.IsNullOrEmpty(args.rabbitQueeName))
{
qu = bus.Advanced.QueueDeclare();
}
else
qu = bus.Advanced.QueueDeclare(args.rabbitQueeName);
bus.Advanced.Bind(ex, qu, uteName.ToSafeString(""));
bus.Advanced.Consume(qu, (body, properties, info) => Task.Factory.StartNew(() =>            {
try
{
lock (lockHelper)
{
var message = Encoding.UTF8.GetString(body);
//处理消息
methodCall = job => job.Consume(message);
methodCall.Compile()(new TConsum());
}
}
catch (Exception e)
{
throw e;
}
}));
}
}
这⾥⾯主要封装了消息的发送和订阅,以及IBus单例的创建。在后续的消息发送和订阅主要就通过此处来实现。我们看到⼀开始的类⽬结构中还有⼀个RaExMessageHandleJob类,这个类就是⼀个后台
循环任务,⽤来监测数据库中是否保存了发送失败的消息,如果有,则将消息取出,尝试重新发送。在此就不做多的介绍,⼤家可以根据⾃⼰的实际需求来实现。
3.3 发布者
现在来看⼀下消息发布者的代码
主要的发送代码都在Send类中,其中appsettings.json⾥⾯配置了Rabbitmq的连接地址,TestDto只是⼀个为了⽅便演⽰的参数类。
下⾯看⼀下Program⾥⾯的代码
很简单的⼀个发送消息调⽤。
然后来看⼀下Send类中的代码
public class Send
{
///<summary>
///发送消息
///</summary>
public static void SendMessage()
{
//需要注意⼀点⼉,如果发送的时候,在该管道下不到相匹配的队列框架将默认丢弃该消息
//推送模式
/
/推送模式下,需指定管道名称和路由键值名称
//消息只会被发送到和指定路由键值完全匹配的队列中
var directdto = new PushMsg()
{
activemq和rocketmq的区别sendMsg = new TestDto()
{
Var1 = "这是推送模式"
},
exchangeName = "message.directdemo",
routeName= "routekey",
sendEnum =SendEnum.推送模式
};
//同步发送,返回true或fasle true 发送成功,消息已存储到Rabbitmq中,false表⽰发送失败
var b= RabbitMQManage.PushMessage(directdto);
//异步发送,如果失败,失败的消息会被写⼊数据库,会有后台线程轮询数据库进⾏重新发送
//RabbitMQManage.PushMessageAsync(directlist);