Dubbo源码解析-Dubbo服务消费者_Injvm协议(一)

前言:

    前两篇我们分析了Dubbo服务提供者,在创建时的基本配置属性,如ServiceConfig、ApplicationConfig、RegistryConfig等。基本了解了基于API方式来创建Dubbo服务提供者的套路。

    同时第二篇我们分析了在injvm(本地)模式下,dubbo服务如何向外注册(本质上还是注册在本地的map中,就是InjvmProtocol.exporterMap中),等待同进程中的服务消费者来调用(这个很重要,笔者基于Dubbo-demo源码中测试代码进行测试时候,无论怎样,消费者都会报错,找不到服务提供者,就是因为消费者和提供者不在同一个进程内)。

    下面我们就来看下基于Injvm模式下的服务消费者创建方式及源码分析。

1.Injvm模式下的服务消费者和创建者

   代码来自dubbo-2.7.7中dubbo-demo-api项目下的Application,笔者有所精简(最主要的是将服务的发布和消费放到同一个main方法中了)

public class Application {
	// 服务提供者代码有所精简,本质上还是与之前的示例一样
	public static void main(String[] args) throws Exception {
        startWithExport();
        runWithRefer();
    }

    // 服务提供者
    private static void startWithExport() throws InterruptedException {
        ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
        service.setInterface(DemoService.class);
        service.setRef(new DemoServiceImpl());
        service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));
        service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));

        // 只暴露在本地,当前进程内
        service.setScope("local");
        service.export();

        System.out.println("dubbo service started");
//        new CountDownLatch(1).await();
    }

	// 服务消费者
    private static void runWithRefer() {
        ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
        reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
        reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
        reference.setInterface(DemoService.class);
        // 寻找本地模式的服务提供者
        reference.setScope("local");
        DemoService service = reference.get();
        String message = service.sayHello("dubbo");
        System.out.println(message);
    }
}

有关于ApplicationConfig、RegistryConfig等之前都已经有过介绍,不再赘述。我们主要来看下ReferenceConfig

1.1 ReferenceConfig

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
 	// 在 Dubbo服务提供(Injvm)中我们有过分析,这里返回的就是Protocol$Adaptive,
    // 调用链是Protocol$Adaptive --> ProtocolFilterWrapper  --> ProtocolListenerWrapper --> InjvmProtocol
    private static final Protocol REF_PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
	// 这个暂时用不上,可以先忽略
    private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
	// 同理,这里返回的是ProxyFactory$Adaptive,调用链是StubProxyFactoryWrapper --> JavassistProxyFactory
    private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

    // 注册的接口类
    private transient volatile T ref;
    // ReferenceConfig主要属性就是以上
    ...
}

// ReferenceConfigBase
public abstract class ReferenceConfigBase<T> extends AbstractReferenceConfig {
 
    // 接口名
    protected String interfaceName;

    //接口类信息
    protected Class<?> interfaceClass;

    /**
     * client type TODO
     */
    protected String client;

    // 用于点对点模式下指定调用的url
    protected String url;

    // 消费者配置信息(指定线程数等信息)
    protected ConsumerConfig consumer;

    // 指定协议
    protected String protocol;

    protected ServiceMetadata serviceMetadata;
}

主要属性就是上述这些,没有什么特别的东西。

有关于Protocol$Adaptive、ProxyFactory$Adaptive的内容可参考  Dubbo源码解析-Dubbo服务提供者_Injvm协议(二)_恐龙弟旺仔的博客-CSDN博客

2.Injvm消费者源码分析

2.1 ReferenceConfig.get()

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
	public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        // 交由init()方法处理
        if (ref == null) {
            init();
        }
        return ref;
    }
    
    public synchronized void init() {
        // 初始化过的则不再初始化
        if (initialized) {
            return;
        }
		...
		// 这里主要用于设置属性 serviceMetadata
        checkAndUpdateSubConfigs();

		...
        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, CONSUMER_SIDE);

        ReferenceConfigBase.appendRuntimeParameters(map);
        ...
        // 一系列的配置参数,不是重点,直接忽略
        map.put(INTERFACE_KEY, interfaceName);
        AbstractConfig.appendParameters(map, getMetrics());
        ...
        serviceMetadata.getAttachments().putAll(map);

        // 这里是重点
        ref = createProxy(map);

        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        ...
    }
}

这里主要是解析ApplicationConfig等配置的参数到map中,最终添加到ServiceConfig中

// 在笔者当前的示例中,map的内容如下:
	"side" -> "consumer"
    "application" -> "dubbo-demo-api-consumer"
    "register.ip" -> "xxx.xxx"
    "release" -> ""
    "methods" -> "sayHello,sayHelloAsync"
    "scope" -> "local"
    "sticky" -> "false"
    "dubbo" -> "2.0.2"
    "pid" -> "9612"
    "interface" -> "org.apache.dubbo.demo.DemoService"
    "timestamp" -> "1628079039393"

2.2 ReferenceConfig.createProxy()

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
	private T createProxy(Map<String, String> map) {
        // 这里判断是否injvm模式,本例中时 是
        if (shouldJvmRefer(map)) {
            // 本例中生成的url 内容为:injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?application=dubbo-demo-api-consumer&dubbo=2.0.2&interface=org.apache.dubbo.demo.DemoService&methods=sayHello,sayHelloAsync&pid=9612&register.ip=172.20.89.171&release=&scope=local&side=consumer&sticky=false&timestamp=1628079039393
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            
            // 这里通过代理对象生成invoker,通过之前的分析我们知道,最终会调用InjvmProtocol.refer()方法,具体分析见2.2.1
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            // 非本地模式的不是本文重点,先忽略
        }

        // 如果当前没有设置check=false,并且Invoker没有找到合适的服务提供者(怎么找呢,见2.2.2)
        if (shouldCheck() && !invoker.isAvailable()) {
            invoker.destroy();
            throw new IllegalStateException("Failed to check the status of the service "
                    + interfaceName
                    + ". No provider available for the service "
                    + (group == null ? "" : group + "/")
                    + interfaceName +
                    (version == null ? "" : ":" + version)
                    + " from the url "
                    + invoker.getUrl()
                    + " to the consumer "
                    + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
        }
        ...
        String metadata = map.get(METADATA_KEY);
        WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
        if (metadataService != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataService.publishServiceDefinition(consumerURL);
        }
        // 重要的又来了,我们接着看
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }
}

2.2.1 Protocol$Adaptive.refer()

调用链依旧是 Protocol$Adaptive --> ProtocolFilterWrapper  --> ProtocolListenerWrapper --> InjvmProtocol

我们直接看InjvmProtocol.refer()方法

public abstract class AbstractProtocol implements Protocol {
 
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        // 最终由InjvmProtocol.protocolBindingRefer实现
        return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
    }
}

// InjvmProtocol.protocolBindingRefer
public class InjvmProtocol extends AbstractProtocol implements Protocol {
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        // 这个exporterMap很重要,就是之前我们injvm模式下,服务提供者将当前服务注册的地方,在当前Invoker中,直接把这个map当做构造参数传入
        return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
    }
}

class InjvmInvoker<T> extends AbstractInvoker<T> {
    private final Map<String, Exporter<?>> exporterMap;
    
    InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
        super(type, url);
        this.key = key;
        this.exporterMap = exporterMap;
    }
}

重要:我们要正确的理解exporterMap,服务提供者(Injvm模式)会将当前服务添加到这个map中,key就是我们当前的接口全限定名。

现在我们在创建服务消费者(Injvm模式),会将InjvmProtocol中的这个exporterMap当做构造参数直接传入InjvmInvoker中,正好作为后续的使用。

2.2.2 invoker.isAvailable()

    本质上还是InjvmInvoker.isAvailable()方法的调用

class InjvmInvoker<T> extends AbstractInvoker<T> {
    public boolean isAvailable() {
        // 接着上面的看,就是看exporterMap是否有当前接口的服务提供者
        InjvmExporter<?> exporter = (InjvmExporter<?>) exporterMap.get(key);
        if (exporter == null) {
            return false;
        } else {
            return super.isAvailable();
        }
    }
}

2.3 PROXY_FACTORY.getProxy()

    正确的调用链就是 ProxyFactory$Adaptive --> StubProxyFactoryWrapper --> JavassistProxyFactory

    我们直接看JavassistProxyFactory.getProxy()

public class JavassistProxyFactory extends AbstractProxyFactory {
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        // 之前的文章中有分析过Proxy.getProxy,就是使用javassist动态创建代理类,最终还是调用InjvmInvoker来实现
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }
}

// Proxy.getProxy()
public abstract class Proxy {
	public static Proxy getProxy(ClassLoader cl, Class<?>... ics) {
        ...

            // create ProxyInstance class.
            String pcn = pkg + ".proxy" + id;
            ccp.setClassName(pcn);
            ccp.addField("public static java.lang.reflect.Method[] methods;");
            ccp.addField("private " + InvocationHandler.class.getName() + " handler;");
            ccp.addConstructor(Modifier.PUBLIC, new Class<?>[]{InvocationHandler.class}, new Class<?>[0], "handler=$1;");
            ccp.addDefaultConstructor();
            Class<?> clazz = ccp.toClass();
            clazz.getField("methods").set(null, methods.toArray(new Method[0]));

            // create Proxy class.
            String fcn = Proxy.class.getName() + id;
            ccm = ClassGenerator.newInstance(cl);
            ccm.setClassName(fcn);
            ccm.addDefaultConstructor();
            ccm.setSuperClass(Proxy.class);
            ccm.addMethod("public Object newInstance(" + InvocationHandler.class.getName() + " h){ return new " + pcn + "($1); }");
            Class<?> pc = ccm.toClass();
        // 还是动态生成代理那一套,我们直接看下动态生成的代理类的内容,见2.3.1
            proxy = (Proxy) pc.newInstance();
        } catch (RuntimeException e) {
            ...
        }
        return proxy;
    }
}

2.3.1 org.apache.dubbo.common.bytecode.Proxy0

    如何查看动态生成的类呢?可以参考这篇文章:https://blog.csdn.net/yfxhao123/article/details/109472166 

package org.apache.dubbo.common.bytecode;
import java.lang.reflect.InvocationHandler;

// 其继承的这个Proxy就是org.apache.dubbo.common.bytecode.Proxy抽象类,并实现了其抽象方法newInstance()
public class Proxy0
  extends Proxy
  implements ClassGenerator.DC
{
  public Object newInstance(InvocationHandler paramInvocationHandler)
  {
    return new proxy0(paramInvocationHandler);
  }
}

 

总结:我们上面示例代码中reference.get()最终返回的就是一个调用代理类Proxy(针对InjvmInvoker的代理类InvokerInvocationHandler对象)

最终,我们还是要通过InvokerInvocationHandler来真正实现调用。

2.4 InvokerInvocationHandler.invoke()

public class InvokerInvocationHandler implements InvocationHandler {
 
    // 这里传入的proxy就是 上面的InjvmInvoker对象
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if (method.getDeclaringClass() == Object.class) {
            return method.invoke(invoker, args);
        }
        String methodName = method.getName();
        Class<?>[] parameterTypes = method.getParameterTypes();
        ...
        // 创建RpcInvocation
        RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
        String serviceKey = invoker.getUrl().getServiceKey();
        rpcInvocation.setTargetServiceUniqueName(serviceKey);
      
        if (consumerModel != null) {
            rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
            rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
        }

        // 还是交由invoker执行,后续的还是熟悉的配方,调用链依次执行ProtocolFilterWrapper --> ListenerInvokerWrapper --> InjvmInvoker
        return invoker.invoke(rpcInvocation).recreate();
    }
}

中间的ProtocolFilterWrapper、ListenerInvokerWrapper,我们先不分析,后续讲到Filter时会仔细说明。

直接来看最终的调用InjvmInvoker

2.4 InjvmInvoker.doInvoker()

class InjvmInvoker<T> extends AbstractInvoker<T> {
	public Result doInvoke(Invocation invocation) throws Throwable {
        // 获取之前存放在exporterMap中的InjvmExporter
        Exporter<?> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
        if (exporter == null) {
            throw new RpcException("Service [" + key + "] not found.");
        }
        RpcContext.getContext().setRemoteAddress(LOCALHOST_VALUE, 0);
        
        // InjvmExport.invoker就是我们之前分析的,忘记的可以再翻翻看下 Dubbo服务提供(Injvm)
        return exporter.getInvoker().invoke(invocation);
    }
}

貌似又回来了,我们拿到服务提供者的InjvmExporter之后,其Invoker属性就是之前文章中说的 JavassistProxyFactory.getInvoker()方法所返回的那个AbstractProxyInvoker

2.5 调回到AbstractProxyInvoker.doInvoke()

public class JavassistProxyFactory extends AbstractProxyFactory {
	...
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                // 最终执行到这个方法,而wrapper.invokeMethod()的调用见2.5.1
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }
}

2.5.1 Wrapper1.invokeMethod()

public class Wrapper0
  extends Wrapper
  implements ClassGenerator.DC
{
public Object invokeMethod(Object paramObject, String paramString, Class[] paramArrayOfClass, Object[] paramArrayOfObject)
    throws InvocationTargetException
  {
    DemoServiceImpl localDemoServiceImpl;
    try
    {
      localDemoServiceImpl = (DemoServiceImpl)paramObject;
    }
    catch (Throwable localThrowable1)
    {
      throw new IllegalArgumentException(localThrowable1);
    }
    try
    {
    // 在这里实现了对localDemoServiceImpl的调用,就是我们在创建ServiceConfig时setRef()方法所引用的类,本例中即为 new DemoServiceImpl()
      if ((!"sayHello".equals(paramString)) || (paramArrayOfClass.length == 1)) {
        return localDemoServiceImpl.sayHello((String)paramArrayOfObject[0]);
      }
      if ((!"sayHelloAsync".equals(paramString)) || (paramArrayOfClass.length == 1)) {
        return localDemoServiceImpl.sayHelloAsync((String)paramArrayOfObject[0]);
      }
    }
    catch (Throwable localThrowable2)
    {
      throw new InvocationTargetException(localThrowable2);
    }
    throw new NoSuchMethodException("Not found method \"" + paramString + "\" in class org.apache.dubbo.demo.provider.DemoServiceImpl.");
  }
}

总结:

    1.通过创建ReferenceConfig,在调用其get()方法时,会返回InjvmInvoker对象,该对象的exporterMap即当前InjvmProtocol下的所有服务提供者信息;

    2.通过ProxyFactory$Adaptive创建对InjvmInvoker的代理,最终调用到JavassistProxyFactory.getProxy()来实现,动态创建了对InjvmInvoker的代理类Proxy0;

    3.在调用业务方法Proxy0.sayHello()时,实际调用的是InjvmInvoker.doInvoker()方法,这里会从之前保存的exporterMap中获取对应的Exporter实现类,并获取Exporter.getInvoker()对象,调用其invoke()方法,此时我们又回到了服务端的逻辑

    4.Exporter.getInvoker()本质上就是JavassistProxyFactory.getInvoker()方法返回值,即AbstractProxyInvoker,其doInvoke()调用交由wrapper实现

    5.根据我们对Wrapper1(动态生成的对象)进行解析,可以发现,其最终调用的还是我们在ServiceConfig.setRef()中设置的实现类。

最重要的两个方法:

public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
	private T createProxy(Map<String, String> map) {
        if (shouldJvmRefer(map)) {
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            // 1.生成Invoker
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
        } else {
            // 非本地模式的不是本文重点,先忽略
        }

        ...
        // 2.生成Proxy
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }
}

所以,服务消费者的主要过程也就是:将消费转变为Invoker(本例中为InjvmInvoker);将Invoker转变为Proxy

最后,用一张时序图来展示下消费者(Injvm模式)全过程Dubbo源码解析-Dubbo服务消费者_Injvm协议(一)

 

上一篇:dubbo源码分析3(dubbo中的spi机制)


下一篇:Dubbo源码解析-动态编译javaAssist的使用