概述
Stream 数据类型是在 Redis 版本 5.0 中添加的,它表示消息的仅追加日志。redis.io 上记录的所有 stream related commands 已在 StackExchange.Redis 客户端库中实现。阅读"Introduction to Redis Streams",以获取有关原始 Redis 命令以及如何使用流的更多信息。
写入流
流中的每条消息或条目均由 StreamEntry
类型表示。每个流的条目包含一个唯一的ID和一个名称/值对数组。名称/值对由 NameValueEntry
类型表示。
使用以下命令将具有单个名称/值对的简单消息添加到流中:
var db = redis.GetDatabase();
var messageId = db.StreamAdd("event_stream", "foo_name", "bar_value");
// messageId = 1518951480106-0
由 StreamAdd
返回的消息ID由将消息添加到流中的毫秒时间和序列号组成。如果在同一毫秒时间创建了两个或更多消息,则序列号用于防止ID冲突。
可以使用以下方法将多个名称/值对写入流:
var values = new NameValueEntry[]
{
new NameValueEntry("sensor_id", "1234"),
new NameValueEntry("temp", "19.8")
};
var db = redis.GetDatabase();
var messageId = db.StreamAdd("sensor_stream", values);
You also have the option to override the auto-generated message ID by passing your own ID to the StreamAdd
method. Other optional parameters allow you to trim the stream's length.
您还可以选择通过将自己的ID传递给StreamAdd方法来覆盖自动生成的消息ID。其他可选参数使您可以调整流的长度。
db.StreamAdd("event_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);
读取流
通过使用 StreamRead
或 StreamRange
方法从流中进行读取。
var messages = db.StreamRead("event_stream", "0-0");
上面的代码将从ID "0-0"
到流的末尾读取所有消息。你可以选择使用可选的 count
参数来限制返回的消息数。
StreamRead
方法还允许你一次从多个流中读取:
var streams = db.StreamRead(new StreamPosition[]
{
new StreamPosition("event_stream", "0-0"),
new StreamPosition("score_stream", "0-0")
});
Console.WriteLine($"Stream = {streams.First().Key}");
Console.WriteLine($"Length = {streams.First().Entries.Length}");
你可以使用 countPerStream
可选参数来限制每个流返回的消息数。
StreamRange
方法允许你返回流中的一系列条目。
var messages = db.StreamRange("event_stream", minId: "-", maxId: "+");
-
和 +
特殊字符表示可能的最小和最大ID。这些值是没有设置参数时的默认值。你还可以选择通过使用 messageOrder
参数来反向读取流。StreamRange
方法还提供了通过使用 count
参数来限制返回的条目数的功能。
var messages = db.StreamRange("event_stream",
minId: "0-0",
maxId: "+",
count: 100,
messageOrder: Order.Descending);
流的信息
StreamInfo
方法提供了读取有关流的基本信息的能力:流的第一个和最后一个条目,流的长度,使用者组的数量等。此信息可用于以更有效的方式处理流。
var info = db.StreamInfo("event_stream");
Console.WriteLine(info.Length);
Console.WriteLine(info.FirstEntry.Id);
Console.WriteLine(info.LastEntry.Id);
消费者组
使用使用者组可让你扩展跨多个工作人员或使用者的流的处理。请阅读“Introduction to Redis Streams”,以获取有关消费者群体的详细信息。
以下内容创建了一个使用者组,并告诉 Redis 从流中的哪个位置开始读取。如果你在第一次创建流之前调用该方法,则默认情况下,StreamCreateConsumerGroup
方法将为你创建流。你可以通过为createStream可选参数传递false来覆盖此默认行为。
// Returns true if created, otherwise false.
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "$");
// or
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", StreamPosition.NewMessages);
特殊字符 "$"
表示使用者组将只读取在创建使用者组之后创建的消息。如果要阅读流中已经存在的消息,则可以提供流中的任何位置。
// Begin reading from the first position in the stream.
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "0-0");
使用 StreamReadGroup
方法将消息读入使用者。此方法接受消息ID作为参数之一。当ID传递给 StreamReadGroup
时,Redis 将仅返回给定使用者的待处理消息,换句话说,它将仅返回使用者已读取的消息。
要将新消息读入使用者,请使用特殊字符 ">"
或 StreamPosition.NewMessages
。 ">"
特殊字符表示从未读取过的消息,从未传递给其他消费者。请注意,消费者组中的消费者是在调用 StreamReadGroup
方法时首次使用时自动创建的。
// Read 5 messages into two consumers.
var consumer_1_messages = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", ">", count: 5);
var consumer_2_messages = db.StreamReadGroup("events_stream", "events_cg", "consumer_2", ">", count: 5);
消费者读取了一条消息后,其状态对于该消费者变为“待处理”状态,其他任何消费者都无法通过 StreamReadGroup
方法读取该消息。可以通过使用 StreamReadGroup
方法并为消费者提供在待处理消息范围内的ID来读取消费者的待处理消息。
// Read the first pending message for the "consumer_1" consumer.
var message = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", "0-0", count: 1);
还可以通过调用 StreamPending
和 StreamPendingMessages
方法来检索待处理的消息信息。StreamPending
返回有关待处理消息数,每个使用者的待处理消息以及最高和最低待处理消息ID的高级信息。
var pendingInfo = db.StreamPending("events_stream", "events_cg");
Console.WriteLine(pendingInfo.PendingMessageCount);
Console.WriteLine(pendingInfo.LowestPendingMessageId);
Console.WriteLine(pendingInfo.HighestPendingMessageId);
Console.WriteLine($"Consumer count: {pendingInfo.Consumers.Length}.");
Console.WriteLine(pendingInfo.Consumers.First().Name);
Console.WriteLine(pendingInfo.Consumers.First().PendingMessageCount);
使用 StreamPendingMessages
方法检索有关给定使用者的待处理消息的详细信息。
// Read the first pending message for the consumer.
var pendingMessages = db.StreamPendingMessages("events_stream",
"events_cg",
count: 1,
consumerName: "consumer_1",
minId: pendingInfo.LowestPendingMessageId);
Console.WriteLine(pendingMessages.Single().MessageId);
Console.WriteLine(pendingMessages.Single().IdleTimeInMilliseconds);
消息在等待消费者处理之前,直到通过调用 StreamAcknowledge
方法得到确认为止。消息被确认后, StreamReadGroup
不能再访问。
// Returns the number of messages acknowledged.
db.StreamAcknowledge("events_stream", "events_cg", pendingMessage.MessageId);
StreamClaim
方法可用于将消费者使用的消息所有权更改为其他消费者。
// Change ownership to consumer_2 for the first 5 messages pending for consumer_1.
var pendingMessages = db.StreamPendingMessages("events_stream",
"events_cg",
count: 5,
consumerName: "consumer_1",
minId: "0-0");
db.StreamClaim("events_stream",
"events_cg",
claimingConsumer: "consumer_2",
minIdleTimeInMs: 0,
messageIds: pendingMessages.Select(m => m.MessageId).ToArray());
还有其他几种使用使用者组处理流的方法。请参考 Streams 单元测试以了解这些方法及其用法。
原文地址:Stream