Dapr-发布/订阅

前言

 前篇文章对Dapr的状态管理进行了解,本篇继续对 订阅/发布 构建块进行了解。

一、定义:

 发布订阅的概念来自于事件驱动架构(EDA)的设计思想,这是一种让程序(应用、服务)之间解耦的主要方式,通过发布订阅的思想也可以实现服务之间的异步调用。而大部分分布式应用都会依赖这样的发布订阅解耦模式。

 Dapr-发布/订阅

 步骤:

  1. 发布服务器将消息发送到消息代理。
  2. 订阅服务器将绑定到消息代理上的订阅。
  3. 消息代理将消息的副本转发给感兴趣的订阅。
  4. 订阅服务器从其订阅使用消息。

 但是不同的消息中间件之间存在细微的差异,项目使用不同的产品需要实现不同的实现类,虽然是明智的决策,但必须编写和维护抽象及其基础实现。此方法需要复杂、重复且容易出错的自定义代码。

 Dapr为了解决这种问题,提供开箱即用的消息传送抽象和实现,封装在 Dapr 构建基块中。业务系统只需调用跟据Dapr的要求实现订阅发布即可。

二、工作原理:

 Dapr 发布&订阅构建基块提供了平台无关的 API 框架来发送和接收消息。你的服务将消息发布到一个命名主题(topic)。服务订阅主题(topic)来使用消息。

 服务在 Dapr Sidecar上调用 pub/sub API。 然后,Sidecar将调用一个预定义的 Dapr pub/sub 组件来封装特定的消息代理产品。 下图 显示了 Dapr 发布/订阅 消息传递堆栈。

 Dapr-发布/订阅

三、功能:

  • 发布/订阅API

  Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。

  服务将消息发布到指定主题, 业务服务订阅主题以使用消息。

  服务在 Dapr sidecar 上调用 pub/sub API。然后,sidecar 调用预定义 Dapr pub/sub 组件。

  任何编程平台都可以使用 Dapr 本机 API 通过 HTTP 或 gRPC 调用构建基块。若要发布消息,请进行以下 API 调用:

http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>

  上述调用中有几个特定于 Dapr 的 URL 段:

    • <dapr-port> 提供 Dapr sidecar 侦听的端口号。

    • <pub-sub-name> 提供所选 Dapr pub/sub 组件的名称。

    • <topic> 提供消息发布到的主题的名称。

  • 消息格式

  要启用消息路由并为每个消息提供附加上下文,Dapr 使用 CloudEvents 1.0 规范 作为其消息格式。 使用 Dapr 应用程序发送的任何信息都将自动包入 Cloud Events 信封中,datacontenttype 属性使用 Content-Type 头部值。

  Dapr 实现以下 Cloud Events 字段:

    • id 
    • source
    • specversion
    • type
    • datacontenttype (可选)

  下面的示例显示了 CloudEvent v1.0 中序列化为 JSON 的 XML 内容:

{
    "specversion" : "1.0",
    "type" : "xml.message",
    "source" : "https://example.com/message",
    "subject" : "Test XML Message",
    "id" : "id-1234-5678-9101",
    "time" : "2020-09-23T06:23:21Z",
    "datacontenttype" : "text/xml",
    "data" : "<note><to>User1</to><from>user2</from><message>hi</message></note>"
}
  • 订阅消息

  Dapr 应用程序可以订阅已发布的 topics。 Dapr 允许您的应用程序有两种方法来订阅 topics:

   声明式:其中定义在外部文件中:

apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: myevent-subscription
spec:
  topic: test_topic  //主题
  route: /TestPubSub //路由
  pubsubname: pubsub //名称
scopes:
- frontend       //为该应用启用订阅

   上面的示例显示了 test_topic主题的事件订阅,使用组件 pubsub

    • route 告诉 Dapr 将所有主题消息发送到应用程序中的 /TestPubSub 端点。

    • scopes 为 frontend 应用启用订阅

   编程方式:订阅在用户代码中定义

  • 消息传递

   Dapr 保证消息传递 at-least-once 语义。 这意味着,当应用程序使用发布/订阅 API 将消息发布到主题时,Dapr 可确保此消息至少传递给每个订阅者一次(at least once)

  • 消费者群体和竞争行消费者模式

   多个消费组、多个应用程序实例使用一个消费组,这些都将由 Dapr 自动处理。 当同一个应用程序的多个实例(相同的 ID) 订阅主题时,Dapr 只将每个消息传递给该应用程序的一个实例。

   Dapr-发布/订阅

   同样,如果两个不同的应用程序 (不同的 ID) 订阅同一主题,那么 Dapr 将每个消息仅传递到每个应用程序的一个实例。

  •  Topic作用域:

   默认情况下,支持Dapr发布/订阅组件的所有主题 (例如,Kafka、Redis、RabbitMQ) 都可用于配置该组件的每个应用程序。 为了限制哪个应用程序可以发布或订阅 topic,Dapr 提供了 topic 作用域限定。 这使您能够让应用程序允许发布哪些主题以及应用程序允许订阅哪些主题。

pub/sub 主题作用域限定

为每个 pub/sub 组件定义发布/订阅范围。 您可能有一个名为 pubsub 的 pub/sub 组件,它有一组范围设置,另一个 pubsub2 另有一组范围设置。

要使用这个主题范围,可以设置一个 pub/sub 组件的三个元数据属性:

    • spec.metadata.publishingScopes
      • 分号分隔应用程序列表& 逗号分隔的主题列表允许该 app 发布信息到主题列表
      • 如果在 publishingScopes (缺省行为) 中未指定任何内容,那么所有应用程序可以发布到所有主题
      • 要拒绝应用程序发布信息到任何主题,请将主题列表留空 (app1=;app2=topic2)
      • 例如, app1=topic1;app2=topic2,topic3;app3= 允许 app1 发布信息至 topic1 ,app2 允许发布信息到 topic2 和 topic3 ,app3 不允许发布信息到任何主题。
    • spec.metadata.subscriptionScopes
      • 分号分隔应用程序列表& 逗号分隔的主题列表允许该 app 订阅主题列表
      • 如果在 subscriptionScopes (缺省行为) 中未指定任何内容,那么所有应用程序都可以订阅所有主题
      • 例如, app1=topic1;app2=topic2,topic3 允许 app1 订阅 topic1 ,app2 可以订阅 topic2 和 topic3
    • spec.metadata.allowedTopics
      • 一个逗号分隔的允许主题列表,对所有应用程序。
      • 如果未设置 allowedTopics (缺省行为) ,那么所有主题都有效。 subscriptionScopes 和 publishingScopes 如果存在则仍然生效。
      • publishingScopes 或 subscriptionScopes 可用于与 allowedTopics 的 conjuction ,以添加限制粒度
  • 消息生存时间:

   Dapr 可以在每个消息的基础上设置超时。 表示如果消息未从 Pub/Sub 组件读取,则消息将被丢弃。 这是为了防止未读消息的积累。 在队列中超过配置的 TTL 的消息就可以说它挂了。  

四、.NET Core 应用

 1、设置Pub/Sub组件:

  本机默认下安装了Redis Staram,在 Windows 上打开%UserProfile%\.dapr\components\pubsub.yaml 组件文件以验证:

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""

 2、实现发布/订阅功能 :

  添加控制器(PubSubController)

[Route("api/[controller]")]
[ApiController]
public class PubSubController : ControllerBase
{
    private readonly ILogger<PubSubController> _logger;
    private readonly DaprClient _daprClient;
    public PubSubController(ILogger<PubSubController> logger, DaprClient daprClient)
    {
        _logger = logger;
        _daprClient = daprClient;
    }

    /// <summary>
    /// 发布消息
    /// </summary>
    /// <returns></returns>
    [HttpGet("pub")]
    public async Task<ActionResult> PubAsync()
    {
        var data = new WeatherForecast() { Summary = "city", Date = DateTime.Now };
        await _daprClient.PublishEventAsync<WeatherForecast>("pubsub", "test_topic", data);
        return Ok();
    }

    /// <summary>
    /// 消费消息
    /// </summary>
    /// <returns></returns>
    [Topic("pubsub", "test_topic")]
    [HttpPost("sub")]
    public async Task<ActionResult> Sub()
    {
        Stream stream = Request.Body;
        byte[] buffer = new byte[Request.ContentLength.Value];
        stream.Position = 0L;
        await stream.ReadAsync(buffer, 0, buffer.Length);
        string content = Encoding.UTF8.GetString(buffer);
        _logger.LogInformation("testsub" + content);
        return Ok(content);
    }
}

  Startup.cs中调整:

public class Startup
{
    public Startup(IConfiguration configuration)
    {
        Configuration = configuration;
    }
    public IConfiguration Configuration { get; }
    // This method gets called by the runtime. Use this method to add services to the container.
    public void ConfigureServices(IServiceCollection services)
    {
        //注入Dapr
        services.AddControllers().AddDapr();
    }
    // This method gets called by the runtime. Use this method to configure the HTTP request pipeline.
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        // 使用CoudEvent
        app.UseCloudEvents();
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }
        app.Use((context, next) =>
        {
            context.Request.EnableBuffering();
            return next();
        });
        app.UseRouting();
        app.UseAuthorization();
        app.UseEndpoints(endpoints =>
        {
            endpoints.MapControllers()
            //订阅处理
            endpoints.MapSubscribeHandler();;
        });
    }
}

 3、dapr运行程序:

dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .\FrontEnd.dll

 4、调用发布命令:

http://127.0.0.1:3501/v1.0/invoke/frontend/method/api/pubsub/pub

 5、通过Dapr cli 发布消息:

dapr publish --publish-app-id frontend --pubsub pubsub --topic test_topic --data '{"date":"0001-01-01T00:00:00","temperatureC":0,"temperatureF":32,"summary":null}'

总结

 pub/sub 模式可帮助你分离分布式应用程序中的服务。 Dapr 发布&订阅构建基块简化了在应用程序中实现此行为。

 通过 Dapr pub/sub,可以将消息发布到特定 主题。构建基块还将查询服务,以确定 (订阅) 主题。

 可以通过 HTTP 或特定于语言的 SDK 之一(例如用于 Dapr 的 .NET SDK)本机使用 Dapr pub/sub。 .NET SDK 与 ASP.NET 平台紧密集成。

 使用 Dapr,可以将受支持的消息代理产品插入应用程序。 然后,无需更改应用程序的代码,即可交换消息代理。

上一篇:手把手教你学Dapr - 5. 状态管理


下一篇:手把手教你学Dapr - 4. 服务调用