Azure Event Hub 技术研究系列3-Event Hub接收事件

上篇博文中,我们通过编程的方式介绍了如何将事件消息发送到Azure Event Hub:

Azure Event Hub 技术研究系列2-发送事件到Event Hub

本篇文章中,我们继续:从Event Hub中接收事件。

1. 新建控制台工程 EventHubReceiver

2. 添加Nuget引用

Microsoft.Azure.EventHubs

Microsoft.Azure.EventHubs.Processor

Azure Event Hub 技术研究系列3-Event Hub接收事件

3. 实现IEventProcessor接口

MyEventProcessor

     using Microsoft.Azure.EventHubs;
using Microsoft.Azure.EventHubs.Processor;
using System.Threading.Tasks; public class MyEventProcessor : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
return Task.CompletedTask;
} public Task OpenAsync(PartitionContext context)
{
Console.WriteLine($"MyEventProcessor initialized. Partition: '{context.PartitionId}'");
return Task.CompletedTask;
} public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
return Task.CompletedTask;
} public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Event message received. Partition: '{context.PartitionId}', Data: '{data}'");
} return context.CheckpointAsync();
}
}

4. Program程序

添加常量作为事件中心连接字符串、事件中心名称、存储帐户容器名称、存储帐户名称和存储帐户密钥。 添加以下代码,并将占位符替换为其对应的值。

        private const string EhConnectionString = "{Event Hubs connection string}";
private const string EhEntityPath = "{Event Hub path/name}"; //MyEventHub
private const string StorageContainerName = "{Storage account container name}"; //eventhubcontainer
private const string StorageAccountName = "{Storage account name}"; //linux1
private const string StorageAccountKey = "{Storage account key}";
private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);

这里涉及到Azure Storage Account,必须为上篇博文中创建的事件中心MyEventHub指定一个存储账户和存储容器

Azure Event Hub 技术研究系列3-Event Hub接收事件

增加MainAysnc方法:注册事件处理器,处理事件消息

         /// <summary>
/// 注册事件处理器
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
private static async Task MainAsync(string[] args)
{
Console.WriteLine("Registering EventProcessor..."); var eventProcessorHost = new EventProcessorHost(
EhEntityPath,
PartitionReceiver.DefaultConsumerGroupName,
EhConnectionString,
StorageConnectionString,
StorageContainerName); // Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>(); Console.WriteLine("Receiving. Press ENTER to stop worker.");
Console.ReadLine(); // Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
}

Main函数

         static void Main(string[] args)
{
MainAsync(args).GetAwaiter().GetResult();
}

Run

Azure Event Hub 技术研究系列3-Event Hub接收事件

至此,我们实现了事件消息发送到Event Hub,同时从Event Hub接收处理事件消息。

周国庆

2017/5/18

上一篇:关于C#开发 windows服务进程


下一篇:centos下安装nginx并部署angular应用