.NET Core使用Cap
准备工作
CAP
需要依赖消息队列和数据存储,支持情况如下:
消息队列
1 | Kafka |
数据存储
1 | SqlServer |
两者自行搭配选择即可,其中SqlServer
和RabbitMQ
在docker中的安装可参照我另一篇备忘开发常用的docker镜像
项目设定
- 两个服务,分别为
ServiceA
,ServiceB
SqlServer
使用CapDemo
数据库,默认sa
账号,密码为Today_is_20200328
,端口为默认的1433
端口RabbitMQ
安装地址为本机,默认端口,账号密码为admin/admin
创建ServiceA
-
在
Visual Studio 2019
中创建空白解决方案,取名为Fantasy.CapDemo
-
在解决方案中新建
Asp.net core
项目,名字为Fantasy.CapDemo.ServiceA
,.net core
版本为3.1
-
nuget
安装以下4个包1
2
3
4DotNetCore.CAP
DotNetCore.CAP.Dashboard
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.SqlServer -
在
Startup.cs
的ConfigureServices
方法中加入以下代码1
2
3
4
5
6
7
8
9
10
11services.AddCap(options =>
{
options.UseSqlServer("Password=Today_is_20200328;Persist Security Info=True;User ID=sa;Initial Catalog=CapDemo;Data Source=127.0.0.1");
options.UseRabbitMQ(r =>
{
r.HostName = "127.0.0.1";
r.UserName = "admin";
r.Password = "admin";
});
options.UseDashboard();
}); -
此时直接运行项目,如果数据库与消息队列均能正常连接,则会在数据库中生成以下两张表
1
2cap.Published
cap.Received -
在
Controllers/WeatherForecastController.cs
中编写消息发布代码
6.1 通过构造函数依赖注入ICapPublisher
对象
6.2 发布消息只需要调用ICapPublisher
对象的PublishAsync
或Publish
方法,传入参数为队列名和消息值,全部代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
namespace Fantasy.CapDemo.ServiceA.Controllers
{
[ ]
[ ]
public class WeatherForecastController : ControllerBase
{
private static readonly string[] Summaries = new[]
{
"Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
};
private readonly ILogger<WeatherForecastController> _logger;
private readonly ICapPublisher _capPublisher;
public WeatherForecastController(ILogger<WeatherForecastController> logger,ICapPublisher capPublisher)
{
_logger = logger;
_capPublisher = capPublisher;
}
[ ]
public async Task<IEnumerable<WeatherForecast>> Get()
{
await _capPublisher.PublishAsync("Fantasy.cap.demo.show.time", DateTime.Now);
var rng = new Random();
return Enumerable.Range(1, 5).Select(index => new WeatherForecast
{
Date = DateTime.Now.AddDays(index),
TemperatureC = rng.Next(\-20, 55),
Summary = Summaries[rng.Next(Summaries.Length)]
})
.ToArray();
}
}
} -
编写接收消息代码,接收消息有两种方式,一种在控制器中,即代码写在
***Controller.cs
中,另一种是在非控制器中,一般为***Service.cs
中,以下分两步执行,因为CAP
在默认情况下一个服务多个地方进行订阅,只会进行一次接收,除非进行分组(后面介绍),所以8/9
两个步骤在测试时需要注释其中一个,只保留另一个 -
编写在控制器中接收消息的代码,直接在控制器中写对应方法,方法参数为发送消息时传入的消息值类型,方法无返回值,再加上方法标签
CapSubscribe
即可,具体代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
using Microsoft.AspNetCore.Mvc;
namespace Fantasy.CapDemo.ServiceA.Controllers
{
public class SubController : Controller
{
[ ]
public void ShowTime(DateTime value)
{
Console.WriteLine($"接受方:SubController.ShowTime 接收到值:{value}");
}
}
}做到这步可以进行一次测试,就可以发现访问
/WeatherForecast
这个接口,会发送一条当前时间的消息出去,同时接收方也会在控制台打印出接收到的消息 -
编写服务中接收消息的代码,需要注意接收消息的类需要继承
ICapSubscribe
接口,同时这里为了方便进行依赖注入,这里我们也自己定义了一个ISubscriberService
接口,同时服务需要在Startup.cs
的ConfigureServices
方法中进行注册
9.1ISubscriberService.cs
代码如下1
2
3
4
5
6
7
8
9
10
11
12using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Fantasy.CapDemo.ServiceA.Services
{
public interface ISubscriberService
{
void ReceivedShowTimeMessage(DateTime value);
}
}9.2
SubscriberService.cs
代码如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using DotNetCore.CAP;
namespace Fantasy.CapDemo.ServiceA.Services
{
public class SubscriberService: ISubscriberService, ICapSubscribe
{
[ ]
public void ReceivedShowTimeMessage(DateTime value)
{
Console.WriteLine($"接受方:SubscriberService.ReceivedShowTimeMessage 接收到值:{value}");
}
}
}9.3 服务注册代码如下
1
services.AddScoped<ISubscriberService, SubscriberService>();
注意:这行服务注册代码需要在
services.AddCap
这个方法之前注册,否则CAP
在进行接口扫描的时候找不到对应已经实现了ICapSubscribe
接口的实现类,则无法进行订阅者注册
注释掉第8步,进行第9步的测试,效果应该与第8步一致 -
有时候一个消息需要多个消费者同时消费,则可以使用
Group
概念,消息发送代码不变,在标记CapSubscribe
的时候,使用Group
参数,多个Group订阅同个消息Id,消息则会对应分发到多个Group中,同个Group只能消费一次,即一个Group对一个消息Id订阅多次,也会只接收一次,具体代码如下
SubscriberService.cs中新增Group参数
1
[ ]
SubController.cs中新增Group参数
1
[ ]
- 测试Group功能,将项目跑起来,此时再访问发送消息的api,会在控制台中打印出两条消息接收记录
创建ServiceB
ServiceB使用EFCore
来进行配置,找ServiceA
步骤2创建Fantasy.CapDemo.ServiceB
项目,因为.Net Core 3.1
没有自带EFCore
,所以这里除了CAP
的4个包之外,还需要nuget安装EFCore
的两个包,汇总起来需要nuget安装的包如下:
1 | DotNetCore.CAP |
创建ServiceDbContext.cs
文件,代码如下
1 | using System; |
在Startup.cs
中配置EFCore
和CAP
,具体代码如下:
1 | services.AddDbContext<ServiceDbContext>(options => |
后续操作与ServiceA
类似,需要注意:多个站点订阅同个消息Id,消息会往多个站点进行发送
附上CAP的Github地址:https://github.com/dotnetcore/CAP
还有一些配合EF的操作还没来得及写,有空再说吧