.NET Core微服务学习-DotNetty

DotNetty介绍:

DotNetty是Azure团队仿照(几乎可以这么说)JAVA的Netty而出来的(目前已实现Netty的一部分),目前在Github上的Star有1.8K+,

地址:https://github.com/Azure/DotNetty,没有任何文档,和代码中少量的注释。虽然比Netty出来晚了很多年,不过我们NET程序员们也该庆幸了,在自己的平台上终于能用上类似Netty这样强大的通信框架了。

Netty 是一个提供 asynchronous event-driven (异步事件驱动)的网络应用框架,是一个用以快速开发高性能、可扩展协议的服务器和客户端。

  换句话说,Netty 是一个 NIO 客户端服务器框架,使用它可以快速简单地开发网络应用程序,比如服务器和客户端的协议。Netty 大大简化了网络程序的开发过程比如 TCP 和 UDP 的 socket 服务的开发。

“快速和简单”并不意味着应用程序会有难维护和性能低的问题,Netty 是一个精心设计的框架,它从许多协议的实现中吸收了很多的经验比如 FTP、SMTP、HTTP、许多二进制和基于文本的传统协议.因此,Netty 已经成功地找到一个方式,在不失灵活性的前提下来实现开发的简易性,高性能,稳定性。

  有一些用户可能已经发现其他的一些网络框架也声称自己有同样的优势,所以你可能会问是 Netty 和它们的不同之处。答案就是 Netty 的哲学设计理念。Netty 从开始就为用户提供了用户体验最好的 API 以及实现设计。正是因为 Netty 的哲学设计理念,才让您得以轻松地阅读本指南并使用 Netty。

DotNetty中几个重要的库(程序集):

DotNetty.Buffers: 对内存缓冲区管理的封装。

DotNetty.Codecs: 对编解码是封装,包括一些基础基类的实现,我们在项目中自定义的协议,都要继承该项目的特定基类和实现。

DotNetty.Codecs.Mqtt: MQTT(消息队列遥测传输)编解码是封装,包括一些基础基类的实现。

DotNetty.Codecs.Protobuf: Protobuf 编解码是封装,包括一些基础基类的实现。

DotNetty.Codecs.ProtocolBuffers: ProtocolBuffers编解码是封装,包括一些基础基类的实现。

DotNetty.Codecs.Redis: Redis 协议编解码是封装,包括一些基础基类的实现。

DotNetty.Common: 公共的类库项目,包装线程池,并行任务和常用帮助类的封装。

DotNetty.Handlers: 封装了常用的管道处理器,比如Tls编解码,超时机制,心跳检查,日志等。

DotNetty.Transport: DotNetty核心的实现,Socket基础框架,通信模式:异步非阻塞。

DotNetty.Transport.Libuv: DotNetty自己实现基于Libuv (高性能的,事件驱动的I/O库) 核心的实现。

常用的库有Codecs, Common, Handlers, Buffers, Transport,目前Azure团队正在实现其他Netty中的API(包括非公共Netty的API),让我们拭目以待吧。

服务化原理可以分为3步:

  1. 服务端启动并且向注册中心发送服务信息,注册中心收到后会定时监控服务状态(常见心跳检测);

  2. 客户端需要开始调用服务的时候,首先去注册中心获取服务信息;

  3. 客户端创建远程调用连接,连接后服务端返回处理信息;

第3步又可以细分,下面说说远程过程调用的原理:

目标:客户端怎么调用远程机器上的公开方法

  1. 服务发现,向注册中心获取服务(这里需要做的有很多:拿到多个服务时需要做负载均衡,同机房过滤、版本过滤、服务路由过滤、统一网关等);

  2. 客户端发起调用,将需要调用的服务、方法、参数进行组装;

  3. 序列化编码组装的消息,这里可以使用json,也可以使用xml,也可以使用protobuf,也可以使用hessian,几种方案的序列化速度还有序列化后占用字节大小都是选择的重要指标,对内笔者建议使用高效的protobuf,它基于TCP/IP二进制进行序列化,体积小,速度快。

  4. 传输协议,可以使用传统的io阻塞传输,也可以使用高效的nio传输(Netty);

  5. 服务端收到后进行反序列化,然后进行相应的处理;

  6. 服务端序列化response信息并且返回;

  7. 客户端收到response信息并且反序列化;

  正如上面第三步的第4条所提到,C类向S类调用时,可以选择RPC或者RESTful,而作为内部通讯,笔者强烈建议使用RPC的方式去调用S类上的所有服务,RPC对比RESTful如下:

优点:

  1. 序列化采用二进制消息,性能好/效率高(空间和时间效率都很不错);

  2. 序列化反序列化直接对应程序中的数据类,不需要解析后在进行映射(XML,JSON都是这种方式);

  3. 相比http协议,没有无用的header,简化传输数据的大小,且基于TCP层传输,速度更快,容量更小;

  4. Netty等一些框架集成(重点,也是本篇介绍的主要框架);

缺点:

  1. 使用复杂,维护成本和学习成本较高,调试困难;

  2. 因为基于HTTP2,绝大部多数HTTP Server、Nginx都尚不支持,即Nginx不能将GRPC请求作为HTTP请求来负载均衡,而是作为普通的TCP请求。(nginx1.9版本已支持);

  3. 二进制可读性差,或者几乎没有任何直接可读性,需要专门的工具进行反序列化;

  4. 默认不具备动态特性(可以通过动态定义生成消息类型或者动态编译支持,后续会介绍利用Rosyln进行动态编译的特性)

DotNetty 实现rpc微服务:

namespace Echo.Server

{

using System;

using System.Threading.Tasks;

using DotNetty.Codecs;

using DotNetty.Handlers.Logging;

using DotNetty.Transport.Bootstrapping;

using DotNetty.Transport.Channels;

using DotNetty.Transport.Libuv;

using Examples.Common;

static class Program

{

static async Task RunServerAsync()

{

ExampleHelper.SetConsoleLogger();

// 申明一个主回路调度组

var dispatcher = new DispatcherEventLoopGroup();

/*

Netty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。

在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。

第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。

如何知道多少个线程已经被使用,如何映射到已经创建的 Channel上都需要依赖于 IEventLoopGroup 的实现,并且可以通过构造函数来配置他们的关系。

*/

// 主工作线程组,设置为1个线程

IEventLoopGroup bossGroup = dispatcher; // (1)

// 子工作线程组,设置为1个线程

IEventLoopGroup workerGroup = new WorkerEventLoopGroup(dispatcher);

try

{

// 声明一个服务端Bootstrap,每个Netty服务端程序,都由ServerBootstrap控制,通过链式的方式组装需要的参数

var serverBootstrap = new ServerBootstrap(); // (2)

// 设置主和工作线程组

serverBootstrap.Group(bossGroup, workerGroup);

if (ServerSettings.UseLibuv)

{

// 申明服务端通信通道为TcpServerChannel

serverBootstrap.Channel<TcpServerChannel>(); // (3)

}

serverBootstrap

// 设置网络IO参数等

.Option(ChannelOption.SoBacklog, 100) // (5)

// 在主线程组上设置一个打印日志的处理器

.Handler(new LoggingHandler("SRV-LSTN"))

// 设置工作线程参数

.ChildHandler(

/*

* ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。

* 也许你想通过增加一些处理类比如DiscardServerHandler 来配置一个新的 Channel 或者其对应的ChannelPipeline 来实现你的网络程序。

* 当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上。

*/

new ActionChannelInitializer<IChannel>( // (4)

channel =>

{

/*

* 工作线程连接器是设置了一个管道,服务端主线程所有接收到的信息都会通过这个管道一层层往下传输,

* 同时所有出栈的消息 也要这个管道的所有处理器进行一步步处理。

*/

IChannelPipeline pipeline = channel.Pipeline;

// 添加日志拦截器

pipeline.AddLast(new LoggingHandler("SRV-CONN"));

// 添加出栈消息,通过这个handler在消息顶部加上消息的长度。

// LengthFieldPrepender(2):使用2个字节来存储数据的长度。

pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));

/*

入栈消息通过该Handler,解析消息的包长信息,并将正确的消息体发送给下一个处理Handler

1,InitialBytesToStrip = 0,       //读取时需要跳过的字节数

2,LengthAdjustment = -5,         //包实际长度的纠正,如果包长包括包头和包体,则要减去Length之前的部分

3,LengthFieldLength = 4,         //长度字段的字节数 整型为4个字节

4,LengthFieldOffset = 1,         //长度属性的起始(偏移)位

5,MaxFrameLength = int.MaxValue, //最大包长

*/

pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

// 业务handler

pipeline.AddLast("echo", new EchoServerHandler());

}));

// bootstrap绑定到指定端口的行为就是服务端启动服务,同样的Serverbootstrap可以bind到多个端口

IChannel boundChannel = await serverBootstrap.BindAsync(ServerSettings.Port); // (6)

Console.WriteLine("wait the client input");

Console.ReadLine();

// 关闭服务

await boundChannel.CloseAsync();

}

finally

{

// 释放指定工作组线程

await Task.WhenAll( // (7)

bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),

workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))

);

}

}

static void Main() => RunServerAsync().Wait();

}

}

  1. IEventLoopGroup 是用来处理I/O操作的多线程事件循环器,DotNetty 提供了许多不同的 EventLoopGroup 的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2个 IEventLoopGroup 会被使用。第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。

  2. ServerBootstrap 是一个启动 Transport 服务的辅助启动类。你可以在这个服务中直接使用 Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。

  3. 这里我们指定使用 TcpServerChannel类来举例说明一个新的 Channel 如何接收进来的连接。

  4. ChannelInitializer 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel,当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上。

  5. 你可以设置这里指定的 Channel 实现的配置参数。我们正在写一个TCP/IP 的服务端,因此我们被允许设置 socket 的参数选项比如tcpNoDelay 和 keepAlive。

  6. 绑定端口然后启动服务,这里我们在机器上绑定了机器网卡上的设置端口,当然现在你可以多次调用 bind() 方法(基于不同绑定地址)。

  7. 使用完成后,优雅的释放掉指定的工作组线程,当然,你可以选择关闭程序,但这并不推荐。

Server端的事件处理代码:

上一部分代码中加粗地方的实现

namespace Echo.Server

{

using System;

using System.Text;

using DotNetty.Buffers;

using DotNetty.Transport.Channels;

/// <summary>

/// 服务端处理事件函数

/// </summary>

public class EchoServerHandler : ChannelHandlerAdapter // ChannelHandlerAdapter 业务继承基类适配器 // (1)

{

/// <summary>

/// 管道开始读

/// </summary>

/// <param name="context"></param>

/// <param name="message"></param>

public override void ChannelRead(IChannelHandlerContext context, object message) // (2)

{

if (message is IByteBuffer buffer)    // (3)

{

Console.WriteLine("Received from client: " + buffer.ToString(Encoding.UTF8));

}

context.WriteAsync(message); // (4)

}

/// <summary>

/// 管道读取完成

/// </summary>

/// <param name="context"></param>

public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); // (5)

/// <summary>

/// 出现异常

/// </summary>

/// <param name="context"></param>

/// <param name="exception"></param>

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)

{

Console.WriteLine("Exception: " + exception);

context.CloseAsync();

}

}

}

  1. DiscardServerHandler 继承自 ChannelInboundHandlerAdapter,这个类实现了IChannelHandler接口,IChannelHandler提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承 ChannelInboundHandlerAdapter 类而不是你自己去实现接口方法。

  2. 这里我们覆盖了 chanelRead() 事件处理方法。每当从客户端收到新的数据时,这个方法会在收到消息时被调用,这个例子中,收到的消息的类型是 ByteBuf。

  3. 为了响应或显示客户端发来的信息,为此,我们将在控制台中打印出客户端传来的数据。

  4. 然后,我们将客户端传来的消息通过context.WriteAsync写回到客户端。

  5. 当然,步骤4只是将流缓存到上下文中,并没执行真正的写入操作,通过执行Flush将流数据写入管道,并通过context传回给传来的客户端。

Client端代码:

重点看注释的地方,其他地方跟Server端没有任何区别

namespace Echo.Client

{

using System;

using System.Net;

using System.Text;

using System.Threading.Tasks;

using DotNetty.Buffers;

using DotNetty.Codecs;

using DotNetty.Handlers.Logging;

using DotNetty.Transport.Bootstrapping;

using DotNetty.Transport.Channels;

using DotNetty.Transport.Channels.Sockets;

using Examples.Common;

static class Program

{

static async Task RunClientAsync()

{

ExampleHelper.SetConsoleLogger();

var group = new MultithreadEventLoopGroup();

try

{

var bootstrap = new Bootstrap();

bootstrap

.Group(group)

.Channel<TcpSocketChannel>()

.Option(ChannelOption.TcpNodelay, true)

.Handler(

new ActionChannelInitializer<ISocketChannel>(

channel =>

{

IChannelPipeline pipeline = channel.Pipeline;

pipeline.AddLast(new LoggingHandler());

pipeline.AddLast("framing-enc", new LengthFieldPrepender(2));

pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(ushort.MaxValue, 0, 2, 0, 2));

pipeline.AddLast("echo", new EchoClientHandler());

}));

IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(ClientSettings.Host, ClientSettings.Port));

// 建立死循环,类同于While(true)

for (;;) // (4)

{

Console.WriteLine("input you data:");

// 根据设置建立缓存区大小

IByteBuffer initialMessage = Unpooled.Buffer(ClientSettings.Size); // (1)

string r = Console.ReadLine();

// 将数据流写入缓冲区

initialMessage.WriteBytes(Encoding.UTF8.GetBytes(r ?? throw new InvalidOperationException())); // (2)

// 将缓冲区数据流写入到管道中

await clientChannel.WriteAndFlushAsync(initialMessage); // (3)

if(r.Contains("bye"))

break;

}

Console.WriteLine("byebye");

await clientChannel.CloseAsync();

}

finally

{

await group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));

}

}

static void Main() => RunClientAsync().Wait();

}

}

  1. 初始化一个缓冲区的大小。

  2. 默认缓冲区接受的数据类型为bytes[],当然这样也更加便于序列化成流。

  3. 将缓冲区的流直接数据写入到Channel管道中。该管道一般为链接通讯的另一端(C端)。

  4. 建立死循环,这样做的目的是为了测试每次都必须从客户端输入的数据,通过服务端回路一次后,再进行下一次的输入操作。

Client端的事件处理代码:

namespace Echo.Client

{

using System;

using System.Text;

using DotNetty.Buffers;

using DotNetty.Transport.Channels;

public class EchoClientHandler : ChannelHandlerAdapter

{

readonly IByteBuffer initialMessage;

public override void ChannelActive(IChannelHandlerContext context) => context.WriteAndFlushAsync(this.initialMessage);

public override void ChannelRead(IChannelHandlerContext context, object message)

{

if (message is IByteBuffer byteBuffer)

{

Console.WriteLine("Received from server: " + byteBuffer.ToString(Encoding.UTF8));

}

}

public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)

{

Console.WriteLine("Exception: " + exception);

context.CloseAsync();

}

}

}

上一篇:.NET Core微服务系列基础文章


下一篇:一步一步学习ABP项目系列文章目录