码迷,mamicode.com
首页 > 其他好文 > 详细

Dubbo服务暴露

时间:2021-03-01 13:35:13      阅读:0      评论:0      收藏:0      [点我收藏+]

标签:ima   协议   ringbuf   gets   监控   factory   ada   ddr   boot   

Dubbo服务暴露的过程整体分为:生成Invoker、根据不同的协议(protocol)将Invoker转换成Exporter。

生成Invoker

生成Invoker的代码:

Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));

此处的proxyFactoryProxyFactory$Adaptive

url为

registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=userprovider&dubbo=2.0.2&export=dubbo%3A%2F%2F169.254.37.154%3A20891%2Fcom.wss.intf.UserInfo%3Fanyhost%3Dtrue%26application%3Duser-provider%26bind.ip%3D169.254.37.154%26bind.port%3D20891%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.wss.intf.UserInfo%26methods%3DsayHello%2CgetInfo%26pid%3D1972%26release%3D2.7.0%26side%3Dprovider%26timestamp%3D1614437158686&pid=1972&registry=zookeeper&release=2.7.0&timestamp=1614437158682

其中getInvoker的逻辑是:

public Invoker getInvoker(Object object, Class class_, URL uRL) throws RpcException {
        if (uRL == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL uRL2 = uRL;
        String string = uRL2.getParameter("proxy", "javassist");
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(").append(uRL2.toString()).append(") use keys([proxy])").toString());
        }
        //JavassistProxyFactory
        ProxyFactory proxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
        return proxyFactory.getInvoker(object, class_, uRL);
    }

ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");这段代码的结果是

技术图片

也就是StubProxyFactoryWrapper(JavassistProxyFactory)这种包装的结构。

而在StubProxyFactoryWrappergetInvoker的方法中,并没进行过多的处理,而是交给了JavassistProxyFactory

技术图片

JavassistProxyFactorygetInvoker方法如下:

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        // TODO Wrapper cannot handle this scenario correctly: the classname contains ‘$‘
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(‘$‘) < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

技术图片

可以看到最终生成的Invoker就是AbstractProxyInvoker

根据协议,转换成Exporter

DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
                        Exporter<?> exporter = protocol.export(wrapperInvoker);

DelegateProviderMetaDataInvoker类仅仅是对生成的Invoker的包装。

此处的protocol指的是ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();生成的Adaptive类。

export方法的逻辑是

@Override
    public Exporter export(Invoker invoker) throws RpcException {
        String string;
        if (invoker == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        }
        if (invoker.getUrl() == null) {
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        }
        URL uRL = invoker.getUrl();
        String string2 = string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();
        if (string == null) {
            throw new IllegalStateException(new StringBuffer().append("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(").append(uRL.toString()).append(") use keys([protocol])").toString());
        }
        Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(string);
        return protocol.export(invoker);
    }

Dubbo在服务暴露的时候,Url的protocol是registry,因此是ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry"),得到的结果是ProtocolListenerWrapper(QosProtocolWrapper(ProtocolFilterWrapper(RegistryProtocol)))

启动QosServer运维服务

ProtocolListenerWrapper的export方法简单的调用内部QosProtocolWrapper的export方法。

org.apache.dubbo.qos.protocol.QosProtocolWrapper#export逻辑如下:

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            startQosServer(invoker.getUrl());
            return protocol.export(invoker);
        }
        return protocol.export(invoker);
    }
    private void startQosServer(URL url) {
        try {
            boolean qosEnable = url.getParameter(QOS_ENABLE,true);//qosEnable:true
            if (!qosEnable) {
                logger.info("qos won‘t be started because it is disabled. " +
                        "Please check dubbo.application.qos.enable is configured either in system property, " +
                        "dubbo.properties or XML/spring-boot configuration.");
                return;
            }

            if (!hasStarted.compareAndSet(false, true)) {
                return;
            }

            int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);//qos端口默认是22222
            boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP,"false"));
            Server server = Server.getInstance();
            server.setPort(port);
            server.setAcceptForeignIp(acceptForeignIp);
            server.start();

        } catch (Throwable throwable) {
            logger.warn("Fail to start qos server: ", throwable);
        }
    }
    public void start() throws Throwable {
        if (!started.compareAndSet(false, true)) {
            return;
        }
        //nio 客户端 启动
        boss = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-boss", true));//为0的话,线程数为cpu核心数的2倍
        worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker);
        serverBootstrap.channel(NioServerSocketChannel.class);
        serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        serverBootstrap.childHandler(new ChannelInitializer<Channel>() {

            @Override
            protected void initChannel(Channel ch) throws Exception {
                ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
            }
        });
        try {
            serverBootstrap.bind(port).sync();
            logger.info("qos-server bind localhost:" + port);
        } catch (Throwable throwable) {
            logger.error("qos-server can not bind localhost:" + port, throwable);
            throw throwable;
        }
    }

可以看到QosProtocolWrapper完成了本地NettyServer的启动,实现运维监控的功能。

ProtocolFilterWrapper的export方法也是简单调用RegistryProtocol的export方法。

注册服务到注册中心和监听目录

org.apache.dubbo.registry.integration.RegistryProtocol#export逻辑如下:

@Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        URL providerUrl = getProviderUrl(originInvoker);

        // Subscribe the override data
        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
        //  the same service. Because the subscribed is cached key with the name of the service, it causes the
        //  subscription information to cover.
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        //绑定zk监听器
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //export invoker

        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // 服务暴露,启动tomcat
        // url to registry
        final Registry registry = getRegistry(originInvoker);//创建注册中心实例
        final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
        ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
                registryUrl, registeredProviderUrl);
        //to judge if we need to delay publish
        boolean register = registeredProviderUrl.getParameter("register", true);
        if (register) {
            register(registryUrl, registeredProviderUrl); // 服务注册
            providerInvokerWrapper.setReg(true);
        }

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        // 监听的是该服务configurators下路径的变化
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);
        //Ensure that a new exporter instance is returned every time export
        return new DestroyableExporter<>(exporter);
    }

需要注意的是这个方法逻辑不仅仅是往zk注册和监听,还包括启动暴露本地服务。

启动本地服务的逻辑是org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport

final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
                    //注意此处,protocol 不仅仅是httpprotocol,也可能是包装类(ProtocolListenerWrapper、ProtocolFilterWrapper)
                    exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);

这个地方需要注意的是providerUrl已经不是刚才的registryUrl,值为

dubbo://169.254.37.154:20891/com.wss.intf.UserInfo?anyhost=true&application=user-provider&bind.ip=169.254.37.154&bind.port=20891&dubbo=2.0.2&generic=false&interface=com.wss.intf.UserInfo&methods=sayHello,getInfo&pid=6884&release=2.7.0&side=provider&timestamp=1614478602081

可以看到providerUrl协议是dubbo,因此 protocol.export(invokerDelegete)走的逻辑又不一样了。

暴露Dubbo服务

上面registryURL协议是regisstryprotocol.export的结果是ProtocolListenerWrapper(QosProtocolWrapper(ProtocolFilterWrapper(RegistryProtocol))),那么providerUrl的协议是dubbo,得到的结果是

ProtocolListenerWrapper(QosProtocolWrapper(ProtocolFilterWrapper(DubboProtocol)))

org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export的逻辑如下:

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }

        //injvm:  InjvmExporter,ListenerExporterWrapper初始化的时候会调用监听器的export方法

        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

可以看到ProtocolListenerWrapper的返回值包装了QosProtocolWrapper的export方法的返回值。

org.apache.dubbo.qos.protocol.QosProtocolWrapper#export的逻辑如下:

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            startQosServer(invoker.getUrl());
            return protocol.export(invoker);
        }
        return protocol.export(invoker);
    }

可以看到就是简单的调用ProtocolFilterWrapper的export方法,

org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export逻辑如下:

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            return protocol.export(invoker);
        }
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

构建过滤器链

org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#buildInvokerChain

    //消费者: key:reference.filter  group:consumer
    //生产者: key:service.filter   group:provider
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;//InvokerDelegate
    //如果url中含有service.filter,例如dubbo://192.168.1.7:9090/com.alibaba.service1?server.filter=-defalut,value1 去掉默认的,添加value1(spi文件中的key)(此处的url中没有server.filter)
    //@Activate注解修饰的group为provider
    //符合上述2个条件之一的filter才会被筛选出来。
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    Result result = filter.invoke(next, invocation);//此处的filter就是buildInvokerChain返回的last
                    if (result instanceof AsyncRpcResult) {
                        AsyncRpcResult asyncResult = (AsyncRpcResult) result;
                        asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
                        return asyncResult;
                    } else {
                        return filter.onResponse(result, invoker, invocation);
                    }
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;//MyFilter、MonitorFilter、FutureFilter、ConsumerContextFilter
}

关于@Adaptive注解

关于@Activate注解

方法执行完,过滤器链构建完成。

启动Netty服务器暴露服务

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export

@Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
        //启动nettyserver
        openServer(url);
        optimizeSerialization(url);
        return exporter;
    }

org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer

    private void openServer(URL url) {
        ....
          createServer(url)
		....
    }
private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it‘s enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            //实际上调用HeaderExchanger的bind方法。
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }

org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind

//handler是org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler的一个匿名实现类。    
@Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        //
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

最终将NettyServer启动提供服务。

Dubbo服务暴露

标签:ima   协议   ringbuf   gets   监控   factory   ada   ddr   boot   

原文地址:https://www.cnblogs.com/mwss/p/14458678.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!