dubbo之服务本地暴露

写在前面

服务提供者提供的服务标记了@Service注解的类,想要被服务消费者使用,必须将服务暴露出去,即让服务消费者拿到封装服务信息的com.alibaba.dubbo.common.URL对象字符串,当前服务暴露的方式有如下三种:

远程暴露:即将服务信息注册到远端的注册中心,如配置<dubbo:service scope="remote" />。
本地暴露:JVM内部调用,因为信息已经在内存中,通过内存可以直接获取调用信息,因此叫做本地暴露,如配置<dubbo:service scope="local">。
不暴露:不暴露服务,可以忽略这种方式,如配置<dubbo:service scope="none">。

本文来分享的是本地暴露,相关的源码在模块dubbo-rpc-injmv中,如下图:

dubbo之服务本地暴露

dubbo之服务提供者配置 一文中,我们其实分析了部分服务暴露的内容,大家可以看下,本文为了承接,会有部分内容的重叠,我们就从方法com.alibaba.dubbo.config.ServiceConfig.doExportUrls来开始分析。

1:doExportUrls

源码如下:

class FakeCls {
    private void doExportUrls() {
        // 2022-01-21 18:25:43
        List<URL> registryURLs = loadRegistries(true);
        // 循环所有的协议暴露服务到所有的注册中心地址
        // 协议:protocols,即<dubbo:protocol>设置
        // 服务:通过<dubbo:service>设置
        // 注册中心地址:registryURLs,通过<dubbo:registry>指定
        for (ProtocolConfig protocolConfig : protocols) {
            // 2022-01-21 18:34:22
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }
}

2022-01-21 18:25:43处获取所有配置的注册中心的地址,具体参考1.1:loadRegistries2022-01-21 18:34:22处是将服务按照指定的协议注册到注册中心,具体参考1.2:doExportUrlsFor1Protocol

1.1:loadRegistries

本文讲解的时本地服务暴漏,不会使用到这里的信息,但是为了完整性,放在这里,对这部分感兴趣的朋友可以参考dubbo之服务远程暴露 文章分析。

1.2:doExportUrlsFor1Protocol

将服务按照指定的协议注册到注册中心,分为远程暴漏和本地暴漏,其中本地暴漏不会注册服务到注册中心,因为是同一个JVM,信息可以直接从JVM中获取到,因为本文重点分析的是本地服务暴漏,所以关于远程暴漏的相关源码会选择性忽略,关于这部分的分析,可以才参考dubbo之服务远程暴露 文章分析。源码如下:

class FakeCls {
    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        // 协议的名称,如<dubbo:protocol name="dubbo" port="20826">,这里就是dubbo
        // 协议:就是暴漏自己的方法
        String name = protocolConfig.getName();
        // 没有则默认使用dubbo
        if (name == null || name.length() == 0) {
            name = "dubbo";
        }
        //*** 省略构建URL相关代码 ***//
        // 获取scope,如果是本地暴漏的话配置如:<dubbo:service interface="dongshi.daddy.service.scopelocal.ScopeLocalService" ref="scopeLocalService" scope="local"/>
        // 在文章开头也提到了可配置为remote,代表远程暴漏,none代表不爆露
        String scope = url.getParameter(Constants.SCOPE_KEY);
        // 如果是配置scope="none",不进行任何操作,此时不进行暴漏,即不对外使用
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
            // 如果是scope不是remote则使用本地服务暴漏
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                // 2022-01-22 12:25:51
                exportLocal(url);
            }
            // 如果是scope不是local则使用远程服务暴漏
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                // *** 省略远程服务暴漏逻辑 *** //
            }
        }
        // 添加暴漏服务url
        this.urls.add(url);
    }
}

2022-01-22 12:25:51处是本地服务暴漏,具体参考1.3:exportLocal

1.3:exportLocal

源码如下:

class FakeCls {
    private void exportLocal(URL url) {
        // url.getProtocol():一般是dubbo 
        // Constants.LOCAL_PROTOCOL:injvm
        // 为什么做这个判断???
        if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
            // 构建local的URL,如
            // injvm://127.0.0.1/dongshi.daddy.service.scopelocal.ScopeLocalService?accesslog=true&anyhost=true&application=dongshidaddy-provider&bean.name=dongshi.daddy.service.scopelocal.ScopeLocalService&bind.ip=192.168.2.107&bind.port=20826&dubbo=2.0.2&generic=false&interface=dongshi.daddy.service.scopelocal.ScopeLocalService&methods=sayHi&owner=dongshidaddy&pid=6324&scope=local&side=provider&timestamp=1642823993714
            URL local = URL.valueOf(url.toFullString())
                    .setProtocol(Constants.LOCAL_PROTOCOL)
                    .setHost(LOCALHOST)
                    .setPort(0);
            // public static final String SERVICE_IMPL_CLASS = "service.classimpl";
            // url.getServiceKey():dongshi.daddy.service.scopelocal.ScopeLocalService
            // getServiceClass(ref:class dongshi.daddy.service.scopelocal.ScopeLocalServiceImpl
            // 将服务类的信息存储到StaticContext中
            StaticContext.getContext(Constants.SERVICE_IMPL_CLASS).put(url.getServiceKey(), getServiceClass(ref));
            // 2022-01-22 17:21:22
            Exporter<?> exporter = protocol.export(
                    proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
            exporters.add(exporter);
            logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry");
        }
    }
}

该小节以下部分稍微有点绕,大家吃耐心,不懂的话,多看几遍!!!

2022-01-22 17:21:22处protocol定义为private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();,可以看到是获取自适应扩展类 ,其中从Protocol接口也可以看出来,源码如下:

/**
 * Protocol. (API/SPI, Singleton, ThreadSafe)
 */
@SPI("dubbo")
public interface Protocol {
    int getDefaultPort();
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    void destroy();
}

可以看到export方法是标注了@Adaptive 注解的,此处protocol
Protocol#Adaptive,这个很好理解,因为获取就是动态生成的自适应子类,通过其进行最终调用真正的扩展类实现,那么想要知道调用的到底是谁就需要知道生成的代码到是什么样子的,我们可以通过如下的步骤来获取其内容:

在com.alibaba.dubbo.common.extension.ExtensionLoader.createAdaptiveExtensionClass中的代码ClassLoader classLoader = findClassLoader();添加条件变量"code.contains("Protocol$Adaptive")",然后再次运行程序,就可以停止在这里,获取code的内容了。

如下是我获取的内容:

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
}

我们重点关注其中的export方法,可以看到是调用url.getProtocol()作为目标扩展类的名称,那么是什么值呢?我们的url为injvm://127.0.0.1/...可以看到协议是injvm,那么对应的扩展类是谁呢,可以从文件META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol中找到答案,其中key为injvm的的配置项内容是injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol,因此最终调用的扩展类类是com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol,但是真的是这样吗?我们来debug看一下,如下图:

dubbo之服务本地暴露

从图中可以看出,还分别调用了QosProtocolWrapperProtocolListenerWrapper,ProtocolFilterWrapper,这是Protocol的Wrapper类,我们从META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol中可以看出来,如下图:

dubbo之服务本地暴露

关于Wrapper详细可以参考dubbo之SPI Wrapper分析

最终调用过程为Protocol$Adaptive->QosProtocolWrapper->ProtocolListenerWrapper->ProtocolFilterWrapper->InjvmProtocol。具体的我们在2:Protocol分析。

2:Protocol

源码如下:

@SPI("dubbo")
public interface Protocol {

    // 获取当前协议在没有配置端口时的默认端口号
    int getDefaultPort();

    // 暴漏service供远程调用
    // 1:协议对象需要在收到一个请求后记录远程源的的地址,通过API RpcContext.getContext().setRemoteAddress()
    // 2:该方法必须具备幂等性(idempotent [aɪ'dempətənt]),即通过该方法调用一次或者是多次来暴漏一个URL没有任何差别
    // 3:Invoker实例需要被框架传入进来,protoco扩展类需要用到,如自适应时使用
    // 返回值:Exporter<T>,引用的是被暴漏的service,之后如果是需要取消暴漏的话需要用到
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    void destroy();
}

2.1:Protocol$Adaptive

如何获取该类信息可以参考1.3:exportLocal

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    public void destroy() {
        // 因为没有标注@Adaptive注解,所以直接抛出java.lang.UnsupportedOperationException
        throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }
    
    // 因为没有标注@Adaptive注解,所以直接抛出java.lang.UnsupportedOperationException
    public int getDefaultPort() {
        throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
    }

    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }

    public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.export(arg0);
    }
}

2.2:ProtocolListenerWrapper

源码如下:

class FakeCls {
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 以下远端暴漏才会执行,这里可以忽略,因为url是injvm://打头
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        // 2022-01-23 19:29:28
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }
}

2022-01-23 19:29:28protocol.export(invoker)继续调用装饰的protocol类,这里调用的就是ProtocolFilterWrapper,关于该类参考2.3:ProtocolFilterWrapperCollections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class).getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY))处是使用keyexporter.listener,从url获取值从而获取要激活的ExporterListener扩展类。ListenerExporterWrapper构造函数源码如下:

class FakeCls {
    public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {}
}

该监听器的作用是用来监听Exporter暴漏完毕和取消暴漏完毕。

2.3:ProtocolFilterWrapper

主要用于给Invoker增加Filter过滤器链,在调用真正的服务方法之前会调用过滤器Filter的逻辑,具体参考2.3.1:export

2.3.1:export

源码如下:

class FakeCls {
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        // 此处要求协议是registry://,即远程暴露,这里是injvm://,所以可以忽略
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        // 2022-01-24 16:02:53
        // 这里的protocol就是InJvmProtocol
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

}

2022-01-24 16:02:53buildInvokerChain添加Filter链,具体参考2.3.2:buildInvokerChain

2.3.2:buildInvokerChain

源码如下:

class FakeCls {
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        // 获取激活的Filter扩展类集合
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                // 将Filter封装成Invoker,调用invoke方法时,内部链式调用下一个Invoker,最后一个Invoker就是目标服务类方法的Invoker
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        // 这行代码比较关键,将next作为参数调用Filter类方法,在Filter类方法内部我们就可以通过invoker.invoke来继续向下调用了,最终调用到真正服务类方法
                        return filter.invoke(next, invocation);
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }
}

越靠后的Filter越先执行先执行,执行顺序如filter1->filte2->filter3->...->服务类方法

2.4:InjvmProtocol

该类是Injvm协议的实现类,我们还是从入口方法export开始。

2.4.1:export

源码如下:

class FakeCls {
    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
    }
}

主要是创建了InjvmExporter对象。关于该对象,具体参考3:Exporter

3:Exporter

该接口用于基于相关协议来暴露服务,接口源码如下:

public interface Exporter<T> {
    // 获取内部的Invoker
    Invoker<T> getInvoker();
    // 取消暴露
    void unexport();
}

主要类图如下:

dubbo之服务本地暴露

接下来我们从类AbstractExporter开始来看以下。

3.1:AbstractExporter

源码如下:

public abstract class AbstractExporter<T> implements Exporter<T> {

    protected final Logger logger = LoggerFactory.getLogger(getClass());
    // 内部的Invoker
    private final Invoker<T> invoker;
    // 是否没有暴露的标记
    private volatile boolean unexported = false;

    public AbstractExporter(Invoker<T> invoker) {
        if (invoker == null)
            throw new IllegalStateException("service invoker == null");
        // 必须是接口
        if (invoker.getInterface() == null)
            throw new IllegalStateException("service type == null");
        // 必须有暴露的URL
        if (invoker.getUrl() == null)
            throw new IllegalStateException("service url == null");
        this.invoker = invoker;
    }

    @Override
    public Invoker<T> getInvoker() {
        return invoker;
    }

    // 取消暴露,其实就是调用getInvoker().destroy();
    @Override
    public void unexport() {
        if (unexported) {
            return;
        }
        unexported = true;
        getInvoker().destroy();
    }

    @Override
    public String toString() {
        return getInvoker().toString();
    }
}

3.2:InjvmExporter

AbstractExporter的子类,源码如下:

class InjvmExporter<T> extends AbstractExporter<T> {
    // 服务键,一般是服务接口的全限定类名称
    private final String key;
    // 已经暴露的Exporter
    private final Map<String, Exporter<?>> exporterMap;

    InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap) {
        super(invoker);
        this.key = key;
        this.exporterMap = exporterMap;
        exporterMap.put(key, this);
    }
    
    // 取消暴露
    @Override
    public void unexport() {
        super.unexport();
        exporterMap.remove(key);
    }
}

3.3 ListenerExporterWrapper

具有监听功能的Exporter的Wrapper类,源码如下:

public class ListenerExporterWrapper<T> implements Exporter<T> {

    private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);

    private final Exporter<T> exporter;
    // 注册的暴露监听器
    private final List<ExporterListener> listeners;

    public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
        if (exporter == null) {
            throw new IllegalArgumentException("exporter == null");
        }
        this.exporter = exporter;
        this.listeners = listeners;
        // 构造函数执行,代表服务暴露了,执行对应的监听器的暴露方法exported
        if (listeners != null && !listeners.isEmpty()) {
            RuntimeException exception = null;
            for (ExporterListener listener : listeners) {
                if (listener != null) {
                    try {
                        listener.exported(this);
                    } catch (RuntimeException t) {
                        logger.error(t.getMessage(), t);
                        exception = t;
                    }
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    @Override
    public Invoker<T> getInvoker() {
        return exporter.getInvoker();
    }

    @Override
    public void unexport() {
        // 取消暴露,执行监听器的unexported方法
        try {
            exporter.unexport();
        } finally {
            if (listeners != null && !listeners.isEmpty()) {
                RuntimeException exception = null;
                for (ExporterListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.unexported(this);
                        } catch (RuntimeException t) {
                            logger.error(t.getMessage(), t);
                            exception = t;
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }
}

ExporterListener参考4:ExporterListener

4:ExporterListener

源码如下:

@SPI
public interface ExporterListener {
    // 服务暴露调用的方法
    void exported(Exporter<?> exporter) throws RpcException;
    // 服务取消暴露调用的方法
    void unexported(Exporter<?> exporter);
}

类图如下:

dubbo之服务本地暴露

接下来看下这个唯一的实现类ExporterListenerAdapter,如下:

public abstract class ExporterListenerAdapter implements ExporterListener {

    @Override
    public void exported(Exporter<?> exporter) throws RpcException {
    }

    @Override
    public void unexported(Exporter<?> exporter) throws RpcException {
    }

}

也仅仅是个空实现,没有实际的逻辑。

上一篇:acm寒假训练第一周总结


下一篇:NEUQ 字符串 第九届“图灵杯”NEUQ-ACM程序设计竞赛个人赛