上篇博文中,我们通过编程的方式介绍了如何将事件消息发送到Azure Event Hub:
Azure Event Hub 技术研究系列2-发送事件到Event Hub
本篇文章中,我们继续:从Event Hub中接收事件。
1. 新建控制台工程 EventHubReceiver
2. 添加Nuget引用
Microsoft.Azure.EventHubs
Microsoft.Azure.EventHubs.Processor
3. 实现IEventProcessor接口
MyEventProcessor
using Microsoft.Azure.EventHubs;
using Microsoft.Azure.EventHubs.Processor;
using System.Threading.Tasks; public class MyEventProcessor : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
return Task.CompletedTask;
} public Task OpenAsync(PartitionContext context)
{
Console.WriteLine($"MyEventProcessor initialized. Partition: '{context.PartitionId}'");
return Task.CompletedTask;
} public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
return Task.CompletedTask;
} public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Event message received. Partition: '{context.PartitionId}', Data: '{data}'");
} return context.CheckpointAsync();
}
}
4. Program程序
添加常量作为事件中心连接字符串、事件中心名称、存储帐户容器名称、存储帐户名称和存储帐户密钥。 添加以下代码,并将占位符替换为其对应的值。
private const string EhConnectionString = "{Event Hubs connection string}";
private const string EhEntityPath = "{Event Hub path/name}"; //MyEventHub
private const string StorageContainerName = "{Storage account container name}"; //eventhubcontainer
private const string StorageAccountName = "{Storage account name}"; //linux1
private const string StorageAccountKey = "{Storage account key}";
private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey);
这里涉及到Azure Storage Account,必须为上篇博文中创建的事件中心MyEventHub指定一个存储账户和存储容器
增加MainAysnc方法:注册事件处理器,处理事件消息
/// <summary>
/// 注册事件处理器
/// </summary>
/// <param name="args"></param>
/// <returns></returns>
private static async Task MainAsync(string[] args)
{
Console.WriteLine("Registering EventProcessor..."); var eventProcessorHost = new EventProcessorHost(
EhEntityPath,
PartitionReceiver.DefaultConsumerGroupName,
EhConnectionString,
StorageConnectionString,
StorageContainerName); // Registers the Event Processor Host and starts receiving messages
await eventProcessorHost.RegisterEventProcessorAsync<MyEventProcessor>(); Console.WriteLine("Receiving. Press ENTER to stop worker.");
Console.ReadLine(); // Disposes of the Event Processor Host
await eventProcessorHost.UnregisterEventProcessorAsync();
}
Main函数
static void Main(string[] args)
{
MainAsync(args).GetAwaiter().GetResult();
}
Run
至此,我们实现了事件消息发送到Event Hub,同时从Event Hub接收处理事件消息。
周国庆
2017/5/18