Dubbo服务导出如何实现的?我们从中能够学习到什么?

Dubbo通过注解或者xml形式标识Dubbo服务,在Spring 容器发布刷新事件,会立即执行服务导出逻辑,示例如下:

import com.alibaba.dubbo.config.annotation.Service;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;
import java.util.stream.Collectors;

/**
 * 价格服务
 **/
@Service
@Component
public class PriceFacade implements IPriceFacade {
}

Dubbo服务导出分为本地导出和远程导出,本地服务导出就是本地方法的调用,远程导出就是通过设置的通信方式进行远程服务调用,服务导出后,Dubbo还支持向不同的注册中心注册导出的服务。以下重点分析多协议多注册中心服务导出的过程,其大致流程如下:

Dubbo服务导出如何实现的?我们从中能够学习到什么?

简单总结服务导出的流程,就是Dubbo根据设置的协议导出服务,如果是远程服务导出,则根据设置的协议,例如TCP/IP或者HTTP协议进行远程通信。如果设置了服务注册中心,则会在服务导出后,向注册中心注册服务。

在Dubbo中,对于协议Protocol,通信Transporter,以及服务注册Registry的选择都是基于Dubbo SPI自适应机制实现的。如果,配置了服务导出的过滤器或者监听器,也会根据Dubbo SPI自动激活的机制加载对应的集合对象。Dubbo SPI的机制详解可以参考Dubbo SPI是什么?Dubbo SPI如何实现?我们从中能够学习到什么?

多协议多注册中心

在Spring 容器发布刷新事件,会调用Dubbo配置ServiceConfig的export方法进行服务导出。在doExportUrls方法中执行多协议多注册中心的服务导出逻辑。在doExportUrlsFor1Protocol方法中,根据到协议导出服务,根据配置执行本地服务导出或远程服务导出,并且根据是否有注册中心的配置执行舒服注册逻辑,其部分源码如下:

// ServiceConfig.class
// 服务导出
public synchronized void export() {
        if (!shouldExport()) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }

        //------- 省略 ---------//

        if (shouldDelay()) {
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }

        exported();
    }

// ServiceConfig.class
// 多协议服务导出,多注册中心注册
protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        if (exported) {
            return;
        }
        exported = true;

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;
        }
        doExportUrls();
    }

// ServiceConfig.class
// 服务导出
private void doExportUrls() {
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                serviceMetadata
        );
        // 注册中心URL
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
        // 多协议
        for (ProtocolConfig protocolConfig : protocols) {
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
         repository.registerService(pathKey, interfaceClass);
            serviceMetadata.setServiceKey(pathKey);
            // 单个协议的服务导出
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

// ServiceConfig.class
// 单个协议的服务导出
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
        String name = protocolConfig.getName();
        if (StringUtils.isEmpty(name)) {
            name = DUBBO;
        }

        //------- 省略参数设置部分 --------//
        // 服务导出
        if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

            // 本地服务导出
            if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            // 远程服务导出
            if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
                // 包含注册中心配置
                if (CollectionUtils.isNotEmpty(registryURLs)) {
                    for (URL registryURL : registryURLs) {
                        if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                            continue;
                        }
                        url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                        // 监视器配置
                        URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                        if (monitorUrl != null) {
                            url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                        } 
                        String proxy = url.getParameter(PROXY_KEY);
                        if (StringUtils.isNotEmpty(proxy)) {
                            registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                        }
                        // 动态代理生成Invoker
                        Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) 
                        // Invoker包装类
interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                        DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        // 协议导出服务
                        Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                        exporters.add(exporter);
                    }
                }
// 没有注册中心直接导出服务
 else {
                    // 动态代理生成Invoker
                    Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                    // 协议导出服务
                    Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                    exporters.add(exporter);
                }

                WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
                if (metadataService != null) {
                    metadataService.publishServiceDefinition(url);
                }
            }
        }
        this.urls.add(url);
    }

动态代理Invoker

在服务导出过程中,使用动态代理的技术,把所有dubbo服务都转换为Invoker对象,通过调用invoke方法,Invoker统一了所有服务的调用方式,Invoker接口定义如下:

public interface Invoker<T> extends Node {

    /**
     * 调用服务Class对象
     */
    Class<T> getInterface();

    /**
     * 代理方法,invocation封装了服务调用参数
     */
    Result invoke(Invocation invocation) throws RpcException;

}

在Dubbo中,使用Javassist和JDK动态代理两种方式,实现了服务代理类生成,Dubbo默认使用Javassist方式。具体的动态代理定义可以参考代理(Proxy)是什么?为什么要使用代理模式?Spring与Dubbo如何运用代理模式的?。Javassist和JDK动态代理两种生成Invoker的源码如下:

/**
 * javassist
 */
public class JavassistProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

/**
 * Jdk动态代理
 */
public class JdkProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                return method.invoke(proxy, arguments);
            }
        };
    }

}

服务导出协议

在Dubbo中,支持不同协议的服务导出,主要的包括本地服务导出,基于TCP/IP,HTTP,RMI,THRIFT等等。定义了Protocol接口统一了对不同协议的封装,接口其服务导出,引用的定义如下:

public interface Protocol {

    /**
     * 导出服务
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用服务
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
}

 在Dubbo中,分别对不同基础协议定义了不同的实现类,如下:

  • InjvmProtocol本地导出
  • DubboProtocol默认协议设置,使用TCP/IP通信协议导出服务
  • HttpProtocol,使用HTTP协议导出服务
  • RestProtocol,基于Rest 风格导出服务
  • HessianProtocol,RmiProtocol,ThriftProtocol,WebServiceProtocol,XmlRpcProtocol

其类结构图如下:

Dubbo服务导出如何实现的?我们从中能够学习到什么?

使用装饰模式(具体如何使用可以参考装饰器模式-Mybatis教你如何玩),对Protocol进行装饰,其实现的装饰类如下:

  • ProtocolFilterWrapper,过滤器装饰类,对Protocol的export和refer方法,增加链式的过滤处理。例如,日志打印,并发访问控制,类加载转换等等操作。
  • ProtocolListenerWrapper,事件监听装饰类,定义ExporterListener监听服务导出成功或者失败的事件。

其类结构图如下:

Dubbo服务导出如何实现的?我们从中能够学习到什么?

通信协议

不同的协议使用不同的通信方式进行服务的远程调用,主要包括TCP/IP,HTTP通信。DubboProtocol采用的TCP/IP协议通信,HttpProtocol采用HTTP协议进行通信。Dubbo定义Transporter接口,监听服务端口以及链接客户端,其接口定义如下:

public interface Transporter {

    /**
     * 启动服务端,绑定监听端口
     */
    @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
    RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;

    /**
     * 链接服务端,创建客户端
     */
    @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
    Client connect(URL url, ChannelHandler handler) throws RemotingException;

}

采用mina和netty两种框架进行实现,其实现类分别为MinaTransporter,基于netty3的NettyTransporter 和基于netty4的NettyTransporter,其类结构图如下:

Dubbo服务导出如何实现的?我们从中能够学习到什么?

定义HttpBinder接口,定义基于HTTP通信协议,其接口定义如下:

public interface HttpBinder {

    /**
     * 绑定HTTP服务器
     */
    @Adaptive({Constants.SERVER_KEY})
    HttpServer bind(URL url, HttpHandler handler);

}

其实现类分别为JettyHttpBinder,ServletHttpBinder,TomcatHttpBinder,其类结构图如下:

Dubbo服务导出如何实现的?我们从中能够学习到什么?

注册中心

如果导出服务时,配置了注册中心,则Dubbo会根据SPI机制,动态的选择RegistryProtocol。RegistryProtocol使用了装饰模式,具体的服务导出逻辑,交由其设置的Protocol。在export中,定义向注册中心的逻辑。其部分源码如下:

// 实现Protocol接口
public class RegistryProtocol implements Protocol {

// 被装饰的Protocol
private Protocol protocol;


 @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {

        // 注册中心配置
        URL registryUrl = getRegistryUrl(originInvoker);
        URL providerUrl = getProviderUrl(originInvoker);

        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        // 使用protocol导出服务
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
        final Registry registry = getRegistry(originInvoker);
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        if (register) {
            // 向设置的注册中心注册服务
            register(registryUrl, registeredProviderUrl);
        }
        registerStatedUrl(registryUrl, registeredProviderUrl, register);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        notifyExport(exporter);
        return new DestroyableExporter<>(exporter);
    }

    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);

            // 使用被装饰的protocol导出服务
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

/**
 * 使用Registry向注册中心注册服务
 */
private void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }
}

在Dubbo中,定义Registry接口,向注册中心注册服务,其接口定义如下:

public interface Registry extends Node, RegistryService {
    default void reExportRegister(URL url) {
        register(url);
    }

    default void reExportUnregister(URL url) {
        unregister(url);
    }
}

 其具体的实现类包括,NacosRegistry,ZookeeperRegistry,SofaRegistry和RedisRegistry,其类结构图如下:

 

Dubbo服务导出如何实现的?我们从中能够学习到什么?

我们从中能够学习到什么 

设计模式的使用

动态代理的使用

使用Javassit或者JDK动态代理技术生成Invoker实例,统一了对所有服务方法的执行,并且方便对Invoker进行其他逻辑的装饰。具体的动态代理理解可以参考代理(Proxy)是什么?为什么要使用代理模式?Spring与Dubbo如何运用代理模式的?

通信协议的使用

在远程服务绑定服务时,使用了mina与netty框架,实现了对TCP/IP协议,基于NIO进行远程服务通信,NIO的详细使用可以参考Netty如何实现常见的两种线程模式?dubbo基于Netty发布服务时如何选择线程模式?

上一篇:Dubbo的服务暴露过程


下一篇:Dubbo之服务消费原理