前提
前段时间上班无事,上网冲浪看到了消息队列RabbitMQ,就想着学习一下,网上看了点资料在哔哩哔哩上看的到codeman讲的一个rabbitmq的视频,就跟着仔细学习一下,敲一下代码。视频地址: rabbitmq视频 。
RabbitMq介绍
什么是消息队列
MQ全称为Message Queue,即消息队列。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。
应用场景
削峰填谷
在一个时间段很多用户同时进行请求我们的A系统,我的MQ容器就可以用来存储请求按照每秒多少的请求进行发送,减轻服务器的压力。
异步提速
所有的问题当你解决一个问题就会出现另外的问题,外部依赖多系统的稳定性就越差,MQ但凡挂了,系统就会出问题,后面就会使用mq集群来解决这一问题。
消息模型
点对点模式
在上图的模型中,有以下概念:
Producer:生产者,也就是要发送消息的程序
Consumer:消费者:消息的接受者,会一直等待消息到来。
Queue:消息队列。可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
点对点模式只会有一个消费者进行消费
代码附上
新增两个项目一个生产者 Z.RabbitMq.Producer,一个消费者Z.RabbitMQ.Consumer01
项目 Z.RabbitMq.Producer新增HelloProducer
类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 public class HelloProducer { public static void HelloWorldShow () { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1" ; factory.Port = 5672 ; factory.UserName = "admin" ; factory.Password = "admin" ; factory.VirtualHost = "my_vhost" ; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true , false , false , null ); Console.ForegroundColor = ConsoleColor.Red; string message = "hello CodeMan 666" ; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("" , RabbitConstant.QUEUE_HELLO_WORLD, null , body); Console.WriteLine($"producer消息:{message} 已发送" ); } } } }
项目 Z.RabbitMQ.Consumer01新增HelloConsumer
类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public class HelloConsumer { public static void HelloWorldShow () { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1" ; factory.Port = 5672 ; factory.UserName = "admin" ; factory.Password = "admin" ; factory.VirtualHost = "my_vhost" ; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true , false , false , null ); Console.ForegroundColor = ConsoleColor.Cyan; EventingBasicConsumer consumers = new EventingBasicConsumer(channel); consumers.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); channel.BasicAck(ea.DeliveryTag, false ); Console.WriteLine($"Consumer01接收消息:{message} " ); }; channel.BasicConsume(RabbitConstant.QUEUE_HELLO_WORLD, false , consumers); Console.WriteLine("Press [Enter] to exit" ); Console.Read(); } } } }
work消息模型
工作队列或者竞争消费者模式
work queues与入门程序相比,多了一个消费端,两个消费端共同消费同一个队列中的消息,但是一个消息只能被一个消费者获取。
接下来我们来模拟这个流程:
P:生产者:任务的发布者
C1:消费者1:领取任务并且完成任务,假设完成速度较慢(模拟耗时)
C2:消费者2:领取任务并且完成任务,假设完成速度较快
代码附上
新增一个工具类用来获取rabbitmq的连接信息
1 2 3 4 5 6 7 8 9 10 11 12 13 public class RabbitUtils { public static ConnectionFactory GetConnection () { var factory = new ConnectionFactory(); factory.HostName = "127.0.0.1" ; factory.Port = 5672 ; factory.UserName = "admin" ; factory.Password = "admin" ; factory.VirtualHost = "my_vhost" ; return factory; } }
消费者1(C1)在刚刚的 Z.RabbitMQ.Consumer01新增SmsReceive
类
在Program.cs
中的main函数中进行调用 SmsReceive.Sender();
消费者1 延迟30ms接受到信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public class SmsReceive { public static void Sender () { var connection = RabbitUtils.GetConnection().CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true , false , false , null ); channel.BasicQos(0 , 1 , false ); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(30 ); Console.WriteLine($"SmsSender-发送短信成功:{message} " ); channel.BasicAck(ea.DeliveryTag, false ); }; channel.BasicConsume(RabbitConstant.QUEUE_SMS, false , consumer); Console.WriteLine("Press [Enter] to exit" ); Console.Read(); } }
消费者2(C2)在刚刚的 Z.RabbitMQ.Consumer02新增SmsReceive
类
消费者1 延迟60ms接受到信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class SmsReceive { public static void Sender () { var connection = RabbitUtils.GetConnection().CreateConnection(); var channel = connection.CreateModel(); channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true , false , false , null ); channel.BasicQos(0 , 1 , false ); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Thread.Sleep(60 ); Console.WriteLine($"SmsSender-发送短信成功:{message} " ); channel.BasicAck(ea.DeliveryTag, false ); }; channel.BasicConsume(RabbitConstant.QUEUE_SMS, false , consumer); Console.WriteLine("Press [Enter] to exit" ); Console.Read(); } }
生产者Z.RabbitMq.Producer中创建SmsSender类在main函数进行调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class SmsSender { public static void Sender () { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true , false , false , null ); for (int i = 0 ; i < 100 ; i++) { Sms sms = new Sms("乘客" + i, "139000000" + i, "您的车票已预定成功" ); string jsonSms = JsonConvert.SerializeObject(sms); var body = Encoding.UTF8.GetBytes(jsonSms); channel.BasicPublish("" , RabbitConstant.QUEUE_SMS, null , body); Console.WriteLine($"正在发送内容:{jsonSms} " ); } Console.WriteLine("发送数据成功" ); } } } }
运行结构如下
能者多劳
消费者1比消费者2的效率要快,一次任务的耗时较短
消费者2大量时间处于空闲状态,消费者1一直忙碌
通过channel.BasicAck(ea.DeliveryTag, false);
来完成能者多劳的效果,在完成上一次请求之后再去取下一条消息,这就会出现服务器快的消费的更多,慢的消费的更少。
发布订阅模式
Publish/subscribe(交换机类型:Fanout,也称为广播 )
和前面两种模式不同:
声明Exchange,不再声明Queue
发送消息到Exchange,不再发送到Queue,通过exchange发送到queue上
消费者1收到的天气
项目.RabbitMq.Consumer01
创建WeatherFanout
使用exchange(交换机)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class WeatherFanout { public static void Weather () { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout); channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true , false , false , null ); channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "" ); channel.BasicQos(0 , 1 , false ); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的气象信息:{message} " ); channel.BasicAck(ea.DeliveryTag, false ); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false , consumer); Console.WriteLine("Press [Enter] to exit" ); Console.Read(); } } } }
消费者2收到的天气
项目.RabbitMq.Consumer02
创建WeatherFanout
使用exchange(交换机)
代码与消费者01一样
生产者发送天气
生产者把消息推送到交换机上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class WeatherFanout { public static void Weather () { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { string message = "20度" ; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "" , null , body); Console.WriteLine("天气信息发送成功!" ); } } } }
最后得到效果
Routing 路由模型
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
队列与交换机的绑定,不能是任意绑定,而是要指定一个RoutingKey
消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收消息
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class WeatherDirect { public static void Weather () { Dictionary<string , string > area = new Dictionary<string , string >(); area.Add("china.hunan.changsha.20210525" , "中国湖南长沙20210525天气数据" ); area.Add("china.hubei.wuhan.20210525" , "中国湖北武汉20210525天气数据" ); area.Add("china.hubei.xiangyang.20210525" , "中国湖北襄阳20210525天气数据" ); area.Add("us.cal.lsj.20210525" , "美国加州洛杉矶20210525天气数据" ); using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { foreach (var item in area) { channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key, null , Encoding.UTF8.GetBytes(item.Value)); } Console.WriteLine("气象信息发送成功!" ); } } } }
消费者1
接受百度路由的路由消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class WeatherDirect { public static void Weather () { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct); channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true , false , false , null ); channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525" ); channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525" ); channel.BasicQos(0 , 1 , false ); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的气象信息:{message} " ); channel.BasicAck(ea.DeliveryTag, false ); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false , consumer); Console.WriteLine("Press [Enter] to exit" ); Console.Read(); } } } }
消费者2
接受新浪的路由信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 public class WeatherDirect { public static void Weather () { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct); channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true , false , false , null ); channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525" ); channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525" ); channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525" ); channel.BasicQos(0 , 1 , false ); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"新浪收到的气象信息:{message} " ); channel.BasicAck(ea.DeliveryTag, false ); }); channel.BasicConsume(RabbitConstant.QUEUE_SINA, false , consumer); Console.WriteLine("Press [Enter] to exit" ); Console.Read(); } } } }
最后得到的效果
新浪接收对应新浪的routingkey的信息
百度接收对应百度的routingkey的信息
Topics 通配符模式
routingkey支持通配符匹配格式
通配符格式
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过
Topic类型Exchange可以让队列在绑定RoutingKey的时候使用通配符
RoutingKey一般都是由一个或多个单词组成,多个单词之间以“.”分隔,例如:item.insert
通配符规则:#匹配一个或多个词,*恰好匹配一个词,例如item.#能够匹配item.insert.user或者item.insert,item.只能匹配item.insert或者item.user
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class WeatherTopic { public static void Weather () { Dictionary<string , string > area = new Dictionary<string , string >(); area.Add("china.hunan.changsha.20210525" , "中国湖南长沙20210525天气数据" ); area.Add("china.hubei.wuhan.20210525" , "中国湖北武汉20210525天气数据" ); area.Add("china.hubei.xiangyang.20210525" , "中国湖北襄阳20210525天气数据" ); area.Add("us.cal.lsj.20210525" , "美国加州洛杉矶20210525天气数据" ); using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { foreach (var item in area) { channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key, null , Encoding.UTF8.GetBytes(item.Value)); } Console.WriteLine("气象信息发送成功!" ); } } } }
消费者1
获取交换机中通配符为china.#
的信息
(“china.hunan.changsha.20210525”, “中国湖南长沙20210525天气数据”);
(“china.hubei.wuhan.20210525”, “中国湖北武汉20210525天气数据”);
(“china.hubei.xiangyang.20210525”, “中国湖北襄阳20210525天气数据”);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public class WeatherTopic { public static void Weather () { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic); channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true , false , false , null ); channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#" ); channel.BasicQos(0 , 1 , false ); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"百度收到的气象信息:{message} " ); channel.BasicAck(ea.DeliveryTag, false ); }); channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false , consumer); Console.WriteLine("Press [Enter] to exit" ); Console.Read(); } } } }
消费者2
获取交换机中通配符为china.hubei.*.20210525
的信息
(“china.hubei.wuhan.20210525”, “中国湖北武汉20210525天气数据”)
(“china.hubei.xiangyang.20210525”, “中国湖北襄阳20210525天气数据”)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class WeatherTopic { public static void Weather () { using (var connection = RabbitUtils.GetConnection().CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic); channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true , false , false , null ); channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525" ); channel.BasicQos(0 , 1 , false ); var consumer = new EventingBasicConsumer(channel); consumer.Received += ((model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"新浪收到的气象信息:{message} " ); channel.BasicAck(ea.DeliveryTag, false ); }); channel.BasicConsume(RabbitConstant.QUEUE_SINA, false , consumer); Console.WriteLine("Press [Enter] to exit" ); Console.Read(); } } } }
最后得到的效果
百度获取china.#
的信息
新浪获取china.hubei.*.20210525
的信息
RPC
基本概念:
Callback queue 回调队列,客户端向服务器发送请求,服务器端处理请求后,将其处理结果保存在一个存储体中。而客户端为了获得处理结果,那么客户在向服务器发送请求时,同时发送一个回调队列地址reply_to。
Correlation id 关联标识,客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。
流程说明:
当客户端启动的时候,它创建一个匿名独享的回调队列。
在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
将请求发送到一个 rpc_queue 队列中。
服务器等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给 reply_to 字段指定的队列。
客户端等待回调队列里的数据。当有消息出现的时候,它会检查 correlation_id 属性。如果此属性的值与请求匹配,将它返回给应用
分享几题面试题
RabbitMQ中消息可能有的几种状态?
alpha: 消息内容(包括消息体、属性和 headers) 和消息索引都存储在内存中 。
beta: 消息内容保存在磁盘中,消息索引保存在内存中。
gamma: 消息内容保存在磁盘中,消息索引在磁盘和内存中都有 。
delta: 消息内容和索引都在磁盘中 。
死信队列?
DLX,全称为 Dead-Letter-Exchange,死信交换器,死信邮箱。当消息在一个队列中变成死信 (dead message) 之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX,绑定 DLX 的队列就称之 为死信队列。
导致的死信的几种原因?
消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
消息TTL过期。
队列满了
到这里就结束,大家如果需要看视频学习就是点最上面的链接就行了
源码:github