Rabbit RPC 代码阅读(一)

前言

Surging是一款优秀的微服务框架,以前在使用Surging及RPC的时候,往往一知半解,知其然不知其所以然,最近终于鼓起勇气将气源码进行了详解阅读,其阅读笔记如下,以作记录。

RPC介绍

Surging的RPC原理很简单,可以用3点概括:

1、服务端启动并且向注册中心发送服务信息,注册中心收到后会定时监控服务状态(常见心跳检测)。

2、客户端需要开始调用服务的时候,首先去注册中心获取服务信息。

3、客户端创建远程调用连接,连接后服务端返回处理信息。

第3步又可以细分,下面说说远程过程调用的原理:

1、目标:客户端怎么调用远程机器上的公开方法

2、服务发现,向注册中心获取服务(这里需要做的有很多:拿到多个服务时需要做负载均衡,同机房过滤、版本过滤、服务路由过滤、统一网关等);

3、客户端发起调用,将需要调用的服务、方法、参数进行组装;
序列化编码组装的消息,这里可以使用json,也可以使用xml,也可以使用protobuf,也可以使用hessian,几种方案的序列化速度还有序列化后占用字节大小都是选择的重要指标,对内笔者建议使用高效的protobuf,它基于TCP/IP二进制进行序列化,体积小,速度快。

4、传输协议,可以使用传统的IO阻塞传输,也可以使用高效的nio传输(Netty);

5、服务端收到后进行反序列化,然后进行相应的处理;

6、服务端序列化response信息并且返回;

7、客户端收到response信息并且反序列化;

来源:https://www.cnblogs.com/SteveLee/p/rpc_framework_easy.html

下面将针对以上7个步骤所涉及的代码进行遂一解析,过程是痛苦的,结果是让人欣喜的,.Net Core 原来如此强大。

一、客户端怎么调用远程机器上的公开方法

废话不多说,直接开始解析代码

using doteasy.client.Clients;

namespace doteasy.client
{
    internal static class Program
    {
        private static void Main() => RpcClient.TestNoToken();
    }
}

public static void TestNoToken()
{
    using (var proxy = ClientProxy.Generate<IProxyService>(new Uri("http://127.0.0.1:8500")))
    {
        Console.WriteLine($@"{proxy.Sync(1)}");
        Console.WriteLine($@"{proxy.Async(1).Result}");
        Console.WriteLine($@"{proxy.GetDictionaryAsync().Result["key"]}");
    }

    using (var proxy = ClientProxy.Generate<IProxyCommpoundService>(new Uri("http://127.0.0.1:8500")))
    {
        Console.WriteLine($@"{JsonConvert.SerializeObject(proxy.GetCurrentObject(new CompoundObject()))}");
    }
}

核心代码:

using (var proxy = ClientProxy.Generate(new Uri("http://127.0.0.1:8500")))

其作用为生成一个实现了IProxyService接口的代理类,接着调用远程服务器的Sync,Async,GetDictionaryAsync三个方法,IProxyService的定义如下:

public interface IProxyService : IDisposable
{
    Task<IDictionary<string, string>> GetDictionaryAsync();
    Task<string> Async(int id);
    string Sync(int id);
}

继续查看ClientProxy.Generate的定义:

public static T Generate<T>(Uri consulUrl)
{
    serviceCollection.AddLogging().AddClient().UseDotNettyTransport().UseConsulRouteManager(new RpcOptionsConfiguration
    {
        ConsulClientConfiguration = new ConsulClientConfiguration {Address = consulUrl}
    });

    //返回一个预编译的代理对象
    return Proxy<T>();
}

首先注入相应的接口及需要使用的中间件,接着返回一个预编译的代理对象,我们继续往下,查看Proxy的定义:

private static T Proxy<T>(string accessToken = "")
{
    var serviceProvider = Builder();
#if DEBUG
    serviceProvider.GetRequiredService<ILoggerFactory>().AddConsole((c, l) => (int) l >= Loglevel);
#endif
    var serviceProxyGenerate = serviceProvider.GetRequiredService<IServiceProxyGenerater>();
    var serviceProxyFactory = serviceProvider.GetRequiredService<IServiceProxyFactory>();

    if (accessToken == "")
    {
        
        return serviceProxyFactory
            .CreateProxy<T>(serviceProxyGenerate.GenerateProxys(new[] {typeof(T)})
                .ToArray()
                .Single(typeof(T).GetTypeInfo().IsAssignableFrom));
    }

    return serviceProxyFactory
        .CreateProxy<T>(serviceProxyGenerate.GenerateProxys(new[] {typeof(T)}, accessToken)
            .ToArray()
            .Single(typeof(T).GetTypeInfo().IsAssignableFrom));
}

创建一个代理对象,构造语法树及动态编译,最后返回一个指定的代理对象类。

看来核心是构造语法树及动态编译,返回代理对象类,仔细分析,核心代码为:

serviceProxyFactory
            .CreateProxy<T>(serviceProxyGenerate.GenerateProxys(new[] {typeof(T)})
                .ToArray()
                .Single(typeof(T).GetTypeInfo().IsAssignableFrom));

我们首先查看.CreateProxy的实现代码

public static T CreateProxy<T>(this IServiceProxyFactory serviceProxyFactory, Type proxyType)
{
    return (T) serviceProxyFactory.CreateProxy(proxyType);
}

public object CreateProxy(Type proxyType)
{
    return proxyType
        .GetTypeInfo()
        .GetConstructors()
        .First()
        .Invoke(
            //ServiceProxyBase类 构造函数传参
            new object[] {_remoteInvokeService, _typeConvertibleService}
        );
}

没啥特别的,返回一个代理对象类,但问题来了,这个代理对象类是怎么生成的呢,原来核心代码在这段:
serviceProxyGenerate.GenerateProxys(new[] {typeof(T)})

查看其详细代码:

public IEnumerable<Type> GenerateProxys(IEnumerable<Type> interfaceTypes)
{
    //获取程序集
    var assembles = DependencyContext.Default.RuntimeLibraries
        .SelectMany(i => i.GetDefaultAssemblyNames(DependencyContext.Default)
            .Select(z => Assembly.Load(new AssemblyName(z.Name))));

    assembles = assembles.Where(i => i.IsDynamic == false).ToArray();


    var enumerable = interfaceTypes as Type[] ?? interfaceTypes.ToArray();
    
    //构造语法树
    var trees = enumerable.Select(GenerateProxyTree).ToList();

    //编译语法树
    var stream = CompilationUnits.CompileClientProxy(trees,
        assembles
            .Select(a => MetadataReference.CreateFromFile(a.Location))
            .Concat(new[]
            {
                MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location)
            }),
        enumerable.ToArray()[0],
        _logger);

    if (stream == null)
    {
        throw new ArgumentException(@"没有生成任何客户端代码", nameof(stream));
    }

    using (stream)
    {
        var className = enumerable.ToArray()[0].Name.StartsWith("I")
            ? enumerable.ToArray()[0].Name.Substring(1)
            : enumerable.ToArray()[0].Name;
        return AppDomain.CurrentDomain.GetAssemblies().Any(x => x.FullName.Contains(className))
            ? Assembly.Load(StreamToBytes(stream)).GetExportedTypes()
            : AssemblyLoadContext.Default.LoadFromStream(stream).GetExportedTypes();
    }
}

核心代码为:

var trees = enumerable.Select(GenerateProxyTree).ToList();

var stream = CompilationUnits.CompileClientProxy(trees,
assembles
    .Select(a => MetadataReference.CreateFromFile(a.Location))
    .Concat(new[]
    {
        MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location)
    }),
enumerable.ToArray()[0],
_logger);

首先获取代码对象的语法树,再进行动态编译。
这里有个问题就比较突出了,这个GenerateProxyTree到底是啥?下面那句比较好理解,对trees对象进行编译并返回一个序列化的字节码。

查看GenerateProxyTree的代码

public SyntaxTree GenerateProxyTree(Type interfaceType)
{
    var className = interfaceType.Name.StartsWith("I") ? interfaceType.Name.Substring(1) : interfaceType.Name;
    className += "ClientProxy";

    var members = new List<MemberDeclarationSyntax>
    {
        GetConstructorDeclaration(className)
    };

    var interf = interfaceType.GetInterfaces()[0];
    var mthods = interfaceType.GetMethods();
    if (interf.FullName != null && interf.FullName.Contains("IDisposable"))
    {
        var m = interf.GetMethods()[0];
        var mm = mthods.ToList();
        mm.Add(m);
        mthods = mm.ToArray();
    }

    members.AddRange(GenerateMethodDeclarations(mthods));

    return SyntaxFactory.CompilationUnit().WithUsings(GetUsings()).WithMembers(
        SyntaxFactory.SingletonList<MemberDeclarationSyntax>(
            SyntaxFactory.NamespaceDeclaration(
                SyntaxFactory.QualifiedName(
                    SyntaxFactory.QualifiedName(
                        SyntaxFactory.IdentifierName("Rpc"),
                        SyntaxFactory.IdentifierName("Common")),
                    SyntaxFactory.IdentifierName("ClientProxys"))).WithMembers(
                SyntaxFactory.SingletonList<MemberDeclarationSyntax>(
                    SyntaxFactory.ClassDeclaration(className).WithModifiers(
                        SyntaxFactory.TokenList(SyntaxFactory.Token(SyntaxKind.PublicKeyword))).WithBaseList(
                        SyntaxFactory.BaseList(
                            SyntaxFactory.SeparatedList<BaseTypeSyntax>(new SyntaxNodeOrToken[]
                            {
                                SyntaxFactory.SimpleBaseType(
                                    SyntaxFactory.IdentifierName("ServiceProxyBase")),
                                SyntaxFactory.Token(SyntaxKind.CommaToken),
                                SyntaxFactory.SimpleBaseType(GetQualifiedNameSyntax(interfaceType))
                            }))).WithMembers(
                        SyntaxFactory.List(members)))))).NormalizeWhitespace().SyntaxTree;

}
        

这段代码都干了些啥?仔细分析,必须先搞明白两个东西

1、SyntaxFactory 类是干什么的,有什么作用?

查了下官网:
A class containing factory methods for constructing syntax nodes, tokens and trivia.
构造语法树的类,这是什么东西,在网上查了查,知道了个大概
https://johnkoerner.com/csharp/creating-code-using-the-syntax-factory/

static void Main(string[] args)
{
    var console = SyntaxFactory.IdentifierName("Console");
    var writeline = SyntaxFactory.IdentifierName("WriteLine");
    var memberaccess = SyntaxFactory.MemberAccessExpression(SyntaxKind.SimpleMemberAccessExpression, console, writeline);

    var argument = SyntaxFactory.Argument(SyntaxFactory.LiteralExpression(SyntaxKind.StringLiteralExpression, SyntaxFactory.Literal("A")));
    var argumentList = SyntaxFactory.SeparatedList(new[] { argument });

    var writeLineCall =
        SyntaxFactory.ExpressionStatement(
        SyntaxFactory.InvocationExpression(memberaccess,
        SyntaxFactory.ArgumentList(argumentList)));

    var text = writeLineCall.ToFullString();

    Console.WriteLine(text);
    Console.ReadKey();
}

代码拉出来运行了下,心里已经有谱了,这不会就是像JS那样动态的在页面输出JS,JS动态执行吧。SyntaxFactory就是构造C#语法树的类,所有输入的代码都可以转化成相应的对象语法节点。

2、return的类型为SyntaxTree,这个是干什么的,有什么作用?

最终返回的是一个SyntaxTree,先查下官网的解释:

The parsed representation of a source document

再看看有哪些方法:
GetText(CancellationToken) Gets the text of the source document。

构造的语法树,返回相应的源代码,好,我们把代码进行相应的改动,看看返回的是什么东西:

SyntaxTree GPT = SyntaxFactory.CompilationUnit().WithUsings(GetUsings()).WithMembers(
        SyntaxFactory.SingletonList<MemberDeclarationSyntax>(
            SyntaxFactory.NamespaceDeclaration(
                SyntaxFactory.QualifiedName(
                    SyntaxFactory.QualifiedName(
                        SyntaxFactory.IdentifierName("Rpc"),
                        SyntaxFactory.IdentifierName("Common")),
                    SyntaxFactory.IdentifierName("ClientProxys"))).WithMembers(
                SyntaxFactory.SingletonList<MemberDeclarationSyntax>(
                    SyntaxFactory.ClassDeclaration(className).WithModifiers(
                        SyntaxFactory.TokenList(SyntaxFactory.Token(SyntaxKind.PublicKeyword))).WithBaseList(
                        SyntaxFactory.BaseList(
                            SyntaxFactory.SeparatedList<BaseTypeSyntax>(new SyntaxNodeOrToken[]
                            {
                                SyntaxFactory.SimpleBaseType(
                                    SyntaxFactory.IdentifierName("ServiceProxyBase")),
                                SyntaxFactory.Token(SyntaxKind.CommaToken),
                                SyntaxFactory.SimpleBaseType(GetQualifiedNameSyntax(interfaceType))
                            }))).WithMembers(
                        SyntaxFactory.List(members)))))).NormalizeWhitespace().SyntaxTree;

    string SourceText = GPT.GetText().ToString();
    return GPT;

执行,看看SourceText是啥:

using System;
using System.Threading.Tasks;
using System.Collections.Generic;
using DotEasy.Rpc.Core.Runtime.Communally.Convertibles;
using DotEasy.Rpc.Core.Runtime.Client;
using DotEasy.Rpc.Core.Runtime.Communally.Serialization;
using DotEasy.Rpc.Core.Proxy.Impl;

namespace Rpc.Common.ClientProxys
{
    public class ProxyServiceClientProxy : ServiceProxyBase, doteasy.rpc.interfaces.IProxyService
    {
        public ProxyServiceClientProxy(IRemoteInvokeService remoteInvokeService, ITypeConvertibleService typeConvertibleService): base (remoteInvokeService, typeConvertibleService)
        {
        }

        public async Task<IDictionary<System.String, System.String>> GetDictionaryAsync()
        {
            return await InvokeAsync<IDictionary<System.String, System.String>>(new Dictionary<string, object>{}, "doteasy.rpc.interfaces.IProxyService.GetDictionaryAsync");
        }

        public async Task<System.String> Async(System.Int32 id)
        {
            return await InvokeAsync<System.String>(new Dictionary<string, object>{{"id", id}}, "doteasy.rpc.interfaces.IProxyService.Async_id");
        }

        public System.String Sync(System.Int32 id)
        {
            return Invoke<System.String>(new Dictionary<string, object>{{"id", id}}, "doteasy.rpc.interfaces.IProxyService.Sync_id");
        }

        public void Dispose()
        {
        }
    }
}

看完后,确定SyntaxFactory及SyntaxTree的作用就是构造C#源码了。

最后返回的是一个构造好的SyntaxTree对象。

var stream = CompilationUnits.CompileClientProxy(trees,
assembles
    .Select(a => MetadataReference.CreateFromFile(a.Location))
    .Concat(new[]
    {
        MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location)
    }),
enumerable.ToArray()[0],
_logger);
        

CompilationUnits.CompileClientProxy 方法对 SyntaxTree 对象进行编译

public static MemoryStream CompileClientProxy(IEnumerable<SyntaxTree> trees, 
            IEnumerable<MetadataReference> references, Type interfaceType, 
            ILogger logger = null)
{
    references = new[]
    {
        MetadataReference.CreateFromFile(typeof(string).GetTypeInfo().Assembly.Location),
        MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location),
        MetadataReference.CreateFromFile(typeof(ServiceDescriptor).GetTypeInfo().Assembly.Location),
        MetadataReference.CreateFromFile(typeof(IRemoteInvokeService).GetTypeInfo().Assembly.Location),
        MetadataReference.CreateFromFile(typeof(IServiceProxyGenerater).GetTypeInfo().Assembly.Location)
    }.Concat(references);
    
    var className = interfaceType.Name.StartsWith("I") ? interfaceType.Name.Substring(1) : interfaceType.Name;
    return Compile(AssemblyInfo.Create($"DotEasy.Rpc.{className}Proxys"), trees, references, logger);
}

public static MemoryStream Compile(AssemblyInfo assemblyInfo, IEnumerable<SyntaxTree> trees, IEnumerable<MetadataReference> references,
            ILogger logger = null) => Compile(assemblyInfo.Title, assemblyInfo, trees, references, logger);

public static MemoryStream Compile(string assemblyName, AssemblyInfo assemblyInfo, IEnumerable<SyntaxTree> trees,
    IEnumerable<MetadataReference> references, ILogger logger = null)
{
    trees = trees.Concat(new[] {GetAssemblyInfo(assemblyInfo)});
    var compilation = CSharpCompilation.Create(assemblyName, trees, references, new CSharpCompilationOptions(OutputKind.DynamicallyLinkedLibrary));

    var stream = new MemoryStream();
    var result = compilation.Emit(stream);
    if (!result.Success && logger != null)
    {
        foreach (var message in result.Diagnostics.Select(i => i.ToString()))
        {
            logger.LogError(message);
            Console.WriteLine(message);
        }

        return null;
    }

    stream.Seek(0, SeekOrigin.Begin);
    return stream;
}
        

最终返回一个MemoryStream流,再接着往下看:

var stream = CompilationUnits.CompileClientProxy(trees,
    assembles
        .Select(a => MetadataReference.CreateFromFile(a.Location))
        .Concat(new[]
        {
            MetadataReference.CreateFromFile(typeof(Task).GetTypeInfo().Assembly.Location)
        }),
    enumerable.ToArray()[0],
    _logger);
    
    if (stream == null)
    {
    throw new ArgumentException(@"没有生成任何客户端代码", nameof(stream));
    }
    
    using (stream)
    {
    var className = enumerable.ToArray()[0].Name.StartsWith("I")
        ? enumerable.ToArray()[0].Name.Substring(1)
        : enumerable.ToArray()[0].Name;
    return AppDomain.CurrentDomain.GetAssemblies().Any(x => x.FullName.Contains(className))
        ? Assembly.Load(StreamToBytes(stream)).GetExportedTypes()
        : AssemblyLoadContext.Default.LoadFromStream(stream).GetExportedTypes();
    }
            

最后根据 MemoryStream 生成我们所需要的代理类。

至此,提交前的流程基本弄清楚。

接下来会分析:

protected async Task<T> InvokeAsync<T>(IDictionary<string, object> parameters, string serviceId)
{
    var message = await _remoteInvokeService.InvokeAsync(new RemoteInvokeContext
    {
        InvokeMessage = new RemoteInvokeMessage
        {
            Parameters = parameters,
            ServiceId = serviceId
        }
    });

    if (message == null) return default(T);

    var result = _typeConvertibleService.Convert(message.Result, typeof(T));

    return (T) result;
}

/// <summary>
/// 非异步远程调用
/// </summary>
/// <param name="parameters"></param>
/// <param name="serviceId"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
// ReSharper disable once UnusedMember.Global
protected T Invoke<T>(IDictionary<string, object> parameters, string serviceId)
{
    var message = _remoteInvokeService.InvokeAsync(new RemoteInvokeContext
    {
        InvokeMessage = new RemoteInvokeMessage
        {
            Parameters = parameters,
            ServiceId = serviceId
        }
    }).Result;

    if (message == null) return default(T);
    var result = _typeConvertibleService.Convert(message.Result, typeof(T));
    return (T) result;
}

具体是如何调用的.

上一篇:rabbit原理及项目应用


下一篇:docker rabbitmq