.NET Core使用Cap

准备工作

CAP需要依赖消息队列和数据存储,支持情况如下:
消息队列

1
2
3
Kafka  
RabbitMQ
AzureServiceBus

数据存储

1
2
3
4
SqlServer  
MySql
PostgreSql
MongoDB

两者自行搭配选择即可,其中SqlServerRabbitMQ在docker中的安装可参照我另一篇备忘开发常用的docker镜像

项目设定

  • 两个服务,分别为ServiceA,ServiceB
  • SqlServer使用CapDemo数据库,默认sa账号,密码为Today_is_20200328,端口为默认的1433端口
  • RabbitMQ安装地址为本机,默认端口,账号密码为admin/admin

创建ServiceA

  1. Visual Studio 2019中创建空白解决方案,取名为Fantasy.CapDemo

  2. 在解决方案中新建Asp.net core项目,名字为Fantasy.CapDemo.ServiceA.net core版本为3.1

  3. nuget安装以下4个包

    1
    2
    3
    4
    DotNetCore.CAP  
    DotNetCore.CAP.Dashboard
    DotNetCore.CAP.RabbitMQ
    DotNetCore.CAP.SqlServer
  4. Startup.csConfigureServices方法中加入以下代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    services.AddCap(options =>  
    {
    options.UseSqlServer("Password=Today_is_20200328;Persist Security Info=True;User ID=sa;Initial Catalog=CapDemo;Data Source=127.0.0.1");
    options.UseRabbitMQ(r =>
    {
    r.HostName = "127.0.0.1";
    r.UserName = "admin";
    r.Password = "admin";
    });
    options.UseDashboard();
    });
  5. 此时直接运行项目,如果数据库与消息队列均能正常连接,则会在数据库中生成以下两张表

    1
    2
    cap.Published  
    cap.Received
  6. Controllers/WeatherForecastController.cs中编写消息发布代码
    6.1 通过构造函数依赖注入ICapPublisher对象
    6.2 发布消息只需要调用ICapPublisher对象的PublishAsyncPublish方法,传入参数为队列名和消息值,全部代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    using System;  
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    using DotNetCore.CAP;
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Extensions.Logging;

    namespace Fantasy.CapDemo.ServiceA.Controllers
    {
    [ApiController]
    [Route("[controller]")]
    public class WeatherForecastController : ControllerBase
    {
    private static readonly string[] Summaries = new[]
    {
    "Freezing", "Bracing", "Chilly", "Cool", "Mild", "Warm", "Balmy", "Hot", "Sweltering", "Scorching"
    };

    private readonly ILogger<WeatherForecastController> _logger;
    private readonly ICapPublisher _capPublisher;

    public WeatherForecastController(ILogger<WeatherForecastController> logger,ICapPublisher capPublisher)
    {
    _logger = logger;
    _capPublisher = capPublisher;
    }

    [HttpGet]
    public async Task<IEnumerable<WeatherForecast>> Get()
    {
    await _capPublisher.PublishAsync("Fantasy.cap.demo.show.time", DateTime.Now);

    var rng = new Random();
    return Enumerable.Range(1, 5).Select(index => new WeatherForecast
    {
    Date = DateTime.Now.AddDays(index),
    TemperatureC = rng.Next(\-20, 55),
    Summary = Summaries[rng.Next(Summaries.Length)]
    })
    .ToArray();
    }
    }
    }
  7. 编写接收消息代码,接收消息有两种方式,一种在控制器中,即代码写在***Controller.cs中,另一种是在非控制器中,一般为***Service.cs中,以下分两步执行,因为CAP在默认情况下一个服务多个地方进行订阅,只会进行一次接收,除非进行分组(后面介绍),所以8/9两个步骤在测试时需要注释其中一个,只保留另一个

  8. 编写在控制器中接收消息的代码,直接在控制器中写对应方法,方法参数为发送消息时传入的消息值类型,方法无返回值,再加上方法标签CapSubscribe即可,具体代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    using System;  
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    using DotNetCore.CAP;
    using Microsoft.AspNetCore.Mvc;

    namespace Fantasy.CapDemo.ServiceA.Controllers
    {
    public class SubController : Controller
    {
    [CapSubscribe("Fantasy.cap.demo.show.time")]
    public void ShowTime(DateTime value)
    {
    Console.WriteLine($"接受方:SubController.ShowTime 接收到值:{value}");
    }
    }
    }

    做到这步可以进行一次测试,就可以发现访问/WeatherForecast这个接口,会发送一条当前时间的消息出去,同时接收方也会在控制台打印出接收到的消息

  9. 编写服务中接收消息的代码,需要注意接收消息的类需要继承ICapSubscribe接口,同时这里为了方便进行依赖注入,这里我们也自己定义了一个ISubscriberService接口,同时服务需要在Startup.csConfigureServices方法中进行注册
    9.1 ISubscriberService.cs代码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    using System;  
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;

    namespace Fantasy.CapDemo.ServiceA.Services
    {
    public interface ISubscriberService
    {
    void ReceivedShowTimeMessage(DateTime value);
    }
    }

    9.2 SubscriberService.cs代码如下

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    using System;  
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
    using DotNetCore.CAP;

    namespace Fantasy.CapDemo.ServiceA.Services
    {
    public class SubscriberService: ISubscriberService, ICapSubscribe
    {
    [CapSubscribe("Fantasy.cap.demo.show.time")]
    public void ReceivedShowTimeMessage(DateTime value)
    {
    Console.WriteLine($"接受方:SubscriberService.ReceivedShowTimeMessage 接收到值:{value}");
    }
    }
    }

    9.3 服务注册代码如下

    1
    services.AddScoped<ISubscriberService, SubscriberService>();  

    注意:这行服务注册代码需要在services.AddCap这个方法之前注册,否则CAP在进行接口扫描的时候找不到对应已经实现了ICapSubscribe接口的实现类,则无法进行订阅者注册
    注释掉第8步,进行第9步的测试,效果应该与第8步一致

  10. 有时候一个消息需要多个消费者同时消费,则可以使用Group概念,消息发送代码不变,在标记CapSubscribe的时候,使用Group参数,多个Group订阅同个消息Id,消息则会对应分发到多个Group中,同个Group只能消费一次,即一个Group对一个消息Id订阅多次,也会只接收一次,具体代码如下
    SubscriberService.cs中新增Group参数

1
[CapSubscribe("Fantasy.cap.demo.show.time",Group = "SubscriberService")]  
SubController.cs中新增Group参数
1
[CapSubscribe("Fantasy.cap.demo.show.time",Group = "SubController")]  
  1. 测试Group功能,将项目跑起来,此时再访问发送消息的api,会在控制台中打印出两条消息接收记录

创建ServiceB

ServiceB使用EFCore来进行配置,找ServiceA步骤2创建Fantasy.CapDemo.ServiceB项目,因为.Net Core 3.1没有自带EFCore,所以这里除了CAP的4个包之外,还需要nuget安装EFCore的两个包,汇总起来需要nuget安装的包如下:

1
2
3
4
5
6
DotNetCore.CAP  
DotNetCore.CAP.Dashboard
DotNetCore.CAP.RabbitMQ
DotNetCore.CAP.SqlServer
Microsoft.EntityFrameworkCore
Microsoft.EntityFrameworkCore.SqlServer

创建ServiceDbContext.cs文件,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
using System;  
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;

namespace Fantasy.CapDemo.ServiceB.Infrastructure
{
public class ServiceDbContext: DbContext
{
public ServiceDbContext(DbContextOptions<ServiceDbContext> options):base(options)
{
}
}
}

Startup.cs中配置EFCoreCAP,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
services.AddDbContext<ServiceDbContext>(options =>  
{
options.UseSqlServer(
"Password=Today_is_20200328;Persist Security Info=True;User ID=sa;Initial Catalog=CapDemo;Data Source=127.0.0.1");
});

services.AddCap(options =>
{
options.UseEntityFramework<ServiceDbContext>();
options.UseRabbitMQ(r =>
{
r.HostName = "127.0.0.1";
r.UserName = "admin";
r.Password = "admin";
});
options.UseDashboard();
});

后续操作与ServiceA类似,需要注意:多个站点订阅同个消息Id,消息会往多个站点进行发送

附上CAP的Github地址:https://github.com/dotnetcore/CAP

还有一些配合EF的操作还没来得及写,有空再说吧