一、简介
基于Lettuce连接Redis单机和集群的客户端代码,在这篇文章中,给出了使用 Lettuce API 的大致流程。
本文将着力分析一下背后的源码。
首先,回顾一下使用 Lettuce 客户端调用 get 命令的大致流程:
// 步骤1:连接信息
RedisURI redisURI = RedisURI.create("localhost", 6379);
// 步骤2:创建Redis客户端
RedisClient client = RedisClient.create(redisURI);
// 步骤3:建立连接
StatefulRedisConnection<String, String> connection = client.connect();
// 步骤4:异步调用命令
RedisAsyncCommands<String, String> asyncCommands = connection.async();
// asyncCommands 是类 RedisAsyncCommandsImpl 的实例对象
RedisFuture<String> future = asyncCommands.get("hello");
RedisAsyncCommandsImpl 是 AbstractRedisAsyncCommands 子类,以下就是其 get 方法
@Override
public RedisFuture<V> get(K key) {
return dispatch(commandBuilder.get(key));
}
二、构建Redis命令对象
2.1 RedisCommandBuilder.get
Command<K, V, V> get(K key) {
notNullKey(key);
// 第一个参数类型是 CommandType
// 第二个参数类型是 CommandOutput,ValueOutput 是它的子类
// 第三个参数类型是 泛型,表示 Key 的类型
return createCommand(GET, new ValueOutput<>(codec), key);
}
2.1 RedisCommandBuilder.createCommand
BaseRediscreateCommand 是 RedisCommandBuilder 的基类,看它的 createCommand 方法:
protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, K key) {
// addKey 会把参数 K 保存到 singularArguments 列表中,列表的成员类型是 SingularArgument
// KeyArgument 是 SingularArgument 的子类,用来存放 Key
// ValueArgument 也是 SingularArgument 的子类,用来存放 Value
CommandArgs<K, V> args = new CommandArgs<>(codec).addKey(key);
// 第一个参数类型是 CommandType
// 第二个参数类型是 CommandOutput,表示命令的输出结果
// 第三个参数类型是 CommandArgs,表示命令的参数
return createCommand(type, output, args);
}
protected <T> Command<K, V, T> createCommand(CommandType type, CommandOutput<K, V, T> output, CommandArgs<K, V> args) {
// Command
// - ProtocolKeyword,是 CommandType 的父类,用于把命令关键字转化为 byte数组
// - CommandOutput,用来保存命令的response
// - CommandArgs
// - singularArguments,保存命令的request参数
return new Command<>(type, output, args);
}
三、发送Redis命令对象
3.1 AbstractRedisAsyncCommands.dispatch
RedisAsyncCommandsImpl 是 AbstractRedisAsyncCommands 子类,所以实际上是调用了 RedisAsyncCommandsImpl 对象的 dispatch 方法:
public <T> AsyncCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
// 用 AsyncCommand 增强 Command 类:异步redis命令及其结果。
// 所有成功执行的命令最终都将返回 CommandOutput
AsyncCommand<K, V, T> asyncCommand = new AsyncCommand<>(cmd);
// StatefulRedisConnectionImpl,这是单机模式的连接对象,如果是Redis集群或者主从或者哨兵还有不同的实现
RedisCommand<K, V, T> dispatched = connection.dispatch(asyncCommand);
if (dispatched instanceof AsyncCommand) {
return (AsyncCommand<K, V, T>) dispatched;
}
return asyncCommand;
}
3.2 StatefulRedisConnectionImpl.dispatch
@Override
public <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> command) {
// AUTH / SELECT / READONLY / READWRITE / DISCARD / EXEC / MULTI
// 以上这些命令会执行该方法,其他命令什么都不做
RedisCommand<K, V, T> toSend = preProcessCommand(command);
// MULTI 命令会执行该方法,其他命令什么都不做
potentiallyEnableMulti(command);
// 继续发送
return super.dispatch(toSend);
}
3.3 RedisChannelHandler.dispatch
protected <T> RedisCommand<K, V, T> dispatch(RedisCommand<K, V, T> cmd) {
if (debugEnabled) {
logger.debug("dispatching command {}", cmd);
}
// 提供追踪功能
if (tracingEnabled) {
RedisCommand<K, V, T> commandToSend = cmd;
TraceContextProvider provider = CommandWrapper.unwrap(cmd, TraceContextProvider.class);
if (provider == null) {
commandToSend = new TracedCommand<>(cmd,
clientResources.tracing().initialTraceContextProvider().getTraceContext());
}
return channelWriter.write(commandToSend);
}
// 接下来调用 RedisChannelWriter 的写方法,发送Redis命令
return channelWriter.write(cmd);
}
3.4 RedisChannelWriter.write
RedisChannelWriter 的实现类根据Redis部署模式不同,对应的实现类也不同。例如单机对应的是 DefaultEndpoint,集群对应的是 ClusterDistributionChannelWriter。
// DefaultEndpoint.java
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {
LettuceAssert.notNull(command, "Command must not be null");
RedisException validation = validateWrite(1);
if (validation != null) {
// 校验失败,指令异常退出
command.completeExceptionally(validation);
return command;
}
try {
sharedLock.incrementWriters();
if (inActivation) {
command = processActivationCommand(command);
}
// 是否自动刷新缓冲区
if (autoFlushCommands) {
// 判断是否与Redis服务器保持连接
if (isConnected()) {
// 最终会调用 Netty 的 NioSocketChannel 的 writeAndFlush 方法,完成命令的发送
writeToChannelAndFlush(command);
} else {
// 添加到 disconnectedBuffer 中,不立即发送
writeToDisconnectedBuffer(command);
}
} else {
// 保存到 commandBuffer 中,不立即发送
writeToBuffer(command);
}
} finally {
sharedLock.decrementWriters();
if (debugEnabled) {
logger.debug("{} write() done", logPrefix());
}
}
return command;
}
通过 Netty 的 NioSocketChannel 发送对象,自然会经过配置到的管道,管道中配置有编码器,用来把 Redis 命令对象编码为 byte数组再通过 TCP 通道发送出去。
四、把Redis命令对象编码为byte数组
通过 Netty 调用 MessageToByteEncoder
的 write(ChannelHandlerContext, Object, ChannelPromise)
方法中的
try {
this.encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(msg);
}
CommandEncoder 是 MessageToByteEncoder 的子类,实现了抽象方法 encode
:
protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
out.touch("CommandEncoder.encode(…)");
if (msg instanceof RedisCommand) {
RedisCommand<?, ?, ?> command = (RedisCommand<?, ?, ?>) msg;
// 把单个RedisCommand编码成byte数组保存到ByteBuf中
encode(ctx, out, command);
}
// 把多个RedisCommand编码成byte数组保存到ByteBuf中
if (msg instanceof Collection) {
Collection<RedisCommand<?, ?, ?>> commands = (Collection<RedisCommand<?, ?, ?>>) msg;
for (RedisCommand<?, ?, ?> command : commands) {
encode(ctx, out, command);
}
}
}
4.1 CommandEncoder.encode
private void encode(ChannelHandlerContext ctx, ByteBuf out, RedisCommand<?, ?, ?> command) {
try {
// 保存ByteBuf写入前的位置,方便出现异常后回滚
out.markWriterIndex();
// 调用RedisCommand的encode方法,编码并写入ByteBuf
command.encode(out);
} catch (RuntimeException e) {
out.resetWriterIndex();
command.completeExceptionally(new EncoderException(
"Cannot encode command. Please close the connection as the connection state may be out of sync.", e));
}
if (debugEnabled) {
logger.debug("{} writing command {}", logPrefix(ctx.channel()), command);
if (traceEnabled) {
logger.trace("{} Sent: {}", logPrefix(ctx.channel()), out.toString(Charset.defaultCharset()).trim());
}
}
}
4.2 Command.encode
类 Command 实现了 RedisCommand 接口,其 encode 方法源码如下:
public void encode(ByteBuf buf) {
buf.touch("Command.encode(…)");
// * 在RESP协议中表示数组的第一个字节
buf.writeByte('*');
// 接下来是数组长度
CommandArgs.IntegerArgument.writeInteger(buf, 1 + (args != null ? args.count() : 0));
// 在RESP中,协议的不同部分始终以\r\n(CRLF)终止
buf.writeBytes(CommandArgs.CRLF);
// 对命令关键字编码
CommandArgs.BytesArgument.writeBytes(buf, type.getBytes());
// 对命令参数编码
if (args != null) {
args.encode(buf);
}
}
以下是 CommandArgs 内部类 BytesArgument 的 writeBytes 方法
static void writeBytes(ByteBuf buffer, byte[] value) {
// 在RESP协议中,对于大容量字符串(Bulk String),应答的第一个字节是$
buffer.writeByte('$');
// 大容量字符串的长度
IntegerArgument.writeInteger(buffer, value.length);
buffer.writeBytes(CRLF);
// 字符串的内容
buffer.writeBytes(value);
buffer.writeBytes(CRLF);
}
然后是对命令参数逐个进行编码:
public void encode(ByteBuf buf) {
buf.touch("CommandArgs.encode(…)");
// singularArguments 列表保存了命令的所有参数
for (SingularArgument singularArgument : singularArguments) {
singularArgument.encode(buf);
}
}
以 KeyArgument 的 encode 方法为例:
void encode(ByteBuf target) {
if (codec instanceof ToByteBufEncoder) {
ToByteBufEncoder<K, V> toByteBufEncoder = (ToByteBufEncoder<K, V>) codec;
// 开辟一个临时的缓冲区,用来保存 单参数 编码成功后的byte数组
// 调用 target.alloc() 是为了拿到和目标缓冲区的 ByteBufAllocator,保证分配在相同的内存区中
ByteBuf temporaryBuffer = target.alloc().buffer(toByteBufEncoder.estimateSize(key) + 6);
try {
// 比如StringCodec 就是 ToByteBufEncoder 的子类
// StringCodec 把字符串用 utf8或ascii或者其他编码方式 编码成 byte数组
toByteBufEncoder.encodeKey(key, temporaryBuffer);
// 把临时缓冲区中的byte数组写入到目标缓冲区内
ByteBufferArgument.writeByteBuf(target, temporaryBuffer);
} finally {
temporaryBuffer.release();
}
return;
}
ByteBufferArgument.writeByteBuffer(target, codec.encodeKey(key));
}