标签:ima 协议 ringbuf gets 监控 factory ada ddr boot
Dubbo服务暴露的过程整体分为:生成Invoker、根据不同的协议(protocol)将Invoker转换成Exporter。
生成Invoker的代码:
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
此处的proxyFactory
为ProxyFactory$Adaptive
,
url为
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?
application=userprovider&dubbo=2.0.2&export=dubbo%3A%2F%2F169.254.37.154%3A20891%2Fcom.wss.intf.UserInfo%3Fanyhost%3Dtrue%26application%3Duser-provider%26bind.ip%3D169.254.37.154%26bind.port%3D20891%26dubbo%3D2.0.2%26generic%3Dfalse%26interface%3Dcom.wss.intf.UserInfo%26methods%3DsayHello%2CgetInfo%26pid%3D1972%26release%3D2.7.0%26side%3Dprovider%26timestamp%3D1614437158686&pid=1972®istry=zookeeper&release=2.7.0×tamp=1614437158682
其中getInvoker的逻辑是:
public Invoker getInvoker(Object object, Class class_, URL uRL) throws RpcException {
if (uRL == null) {
throw new IllegalArgumentException("url == null");
}
URL uRL2 = uRL;
String string = uRL2.getParameter("proxy", "javassist");
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(org.apache.dubbo.rpc.ProxyFactory) name from url(").append(uRL2.toString()).append(") use keys([proxy])").toString());
}
//JavassistProxyFactory
ProxyFactory proxyFactory = (ProxyFactory)ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(string);
return proxyFactory.getInvoker(object, class_, uRL);
}
而
ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension("javassist");
这段代码的结果是
也就是StubProxyFactoryWrapper(JavassistProxyFactory)
这种包装的结构。
而在StubProxyFactoryWrapper
的getInvoker
的方法中,并没进行过多的处理,而是交给了JavassistProxyFactory
JavassistProxyFactory
的getInvoker
方法如下:
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains ‘$‘
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf(‘$‘) < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
可以看到最终生成的Invoker就是AbstractProxyInvoker
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
DelegateProviderMetaDataInvoker
类仅仅是对生成的Invoker的包装。
此处的protocol指的是ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
生成的Adaptive类。
其export
方法的逻辑是
@Override
public Exporter export(Invoker invoker) throws RpcException {
String string;
if (invoker == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
}
if (invoker.getUrl() == null) {
throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
}
URL uRL = invoker.getUrl();
String string2 = string = uRL.getProtocol() == null ? "dubbo" : uRL.getProtocol();
if (string == null) {
throw new IllegalStateException(new StringBuffer().append("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(").append(uRL.toString()).append(") use keys([protocol])").toString());
}
Protocol protocol = (Protocol)ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(string);
return protocol.export(invoker);
}
Dubbo在服务暴露的时候,Url的protocol是registry
,因此是ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry")
,得到的结果是ProtocolListenerWrapper(QosProtocolWrapper(ProtocolFilterWrapper(RegistryProtocol)))
。
ProtocolListenerWrapper
的export方法简单的调用内部QosProtocolWrapper
的export方法。
org.apache.dubbo.qos.protocol.QosProtocolWrapper#export逻辑如下:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
private void startQosServer(URL url) {
try {
boolean qosEnable = url.getParameter(QOS_ENABLE,true);//qosEnable:true
if (!qosEnable) {
logger.info("qos won‘t be started because it is disabled. " +
"Please check dubbo.application.qos.enable is configured either in system property, " +
"dubbo.properties or XML/spring-boot configuration.");
return;
}
if (!hasStarted.compareAndSet(false, true)) {
return;
}
int port = url.getParameter(QOS_PORT, QosConstants.DEFAULT_PORT);//qos端口默认是22222
boolean acceptForeignIp = Boolean.parseBoolean(url.getParameter(ACCEPT_FOREIGN_IP,"false"));
Server server = Server.getInstance();
server.setPort(port);
server.setAcceptForeignIp(acceptForeignIp);
server.start();
} catch (Throwable throwable) {
logger.warn("Fail to start qos server: ", throwable);
}
}
public void start() throws Throwable {
if (!started.compareAndSet(false, true)) {
return;
}
//nio 客户端 启动
boss = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-boss", true));//为0的话,线程数为cpu核心数的2倍
worker = new NioEventLoopGroup(0, new DefaultThreadFactory("qos-worker", true));
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, true);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new QosProcessHandler(welcome, acceptForeignIp));
}
});
try {
serverBootstrap.bind(port).sync();
logger.info("qos-server bind localhost:" + port);
} catch (Throwable throwable) {
logger.error("qos-server can not bind localhost:" + port, throwable);
throw throwable;
}
}
可以看到QosProtocolWrapper
完成了本地NettyServer的启动,实现运维监控的功能。
ProtocolFilterWrapper
的export方法也是简单调用RegistryProtocol
的export方法。
org.apache.dubbo.registry.integration.RegistryProtocol#export逻辑如下:
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
// the same service. Because the subscribed is cached key with the name of the service, it causes the
// subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
//绑定zk监听器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); // 服务暴露,启动tomcat
// url to registry
final Registry registry = getRegistry(originInvoker);//创建注册中心实例
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
register(registryUrl, registeredProviderUrl); // 服务注册
providerInvokerWrapper.setReg(true);
}
// Deprecated! Subscribe to override rules in 2.6.x or before.
// 监听的是该服务configurators下路径的变化
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}
需要注意的是这个方法逻辑不仅仅是往zk注册和监听,还包括启动暴露本地服务。
启动本地服务的逻辑是org.apache.dubbo.registry.integration.RegistryProtocol#doLocalExport
final Invoker<?> invokerDelegete = new InvokerDelegate<T>(originInvoker, providerUrl);
//注意此处,protocol 不仅仅是httpprotocol,也可能是包装类(ProtocolListenerWrapper、ProtocolFilterWrapper)
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
这个地方需要注意的是providerUrl已经不是刚才的registryUrl,值为
dubbo://169.254.37.154:20891/com.wss.intf.UserInfo?anyhost=true&application=user-provider&bind.ip=169.254.37.154&bind.port=20891&dubbo=2.0.2&generic=false&interface=com.wss.intf.UserInfo&methods=sayHello,getInfo&pid=6884&release=2.7.0&side=provider×tamp=1614478602081
可以看到providerUrl协议是dubbo,因此 protocol.export(invokerDelegete)
走的逻辑又不一样了。
上面registryURL协议是regisstry
,protocol.export
的结果是ProtocolListenerWrapper(QosProtocolWrapper(ProtocolFilterWrapper(RegistryProtocol)))
,那么providerUrl的协议是dubbo,得到的结果是
ProtocolListenerWrapper(QosProtocolWrapper(ProtocolFilterWrapper(DubboProtocol)))
org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper#export的逻辑如下:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
//injvm: InjvmExporter,ListenerExporterWrapper初始化的时候会调用监听器的export方法
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
可以看到ProtocolListenerWrapper
的返回值包装了QosProtocolWrapper
的export方法的返回值。
org.apache.dubbo.qos.protocol.QosProtocolWrapper#export的逻辑如下:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
startQosServer(invoker.getUrl());
return protocol.export(invoker);
}
return protocol.export(invoker);
}
可以看到就是简单的调用ProtocolFilterWrapper
的export方法,
org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#export逻辑如下:
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper#buildInvokerChain
//消费者: key:reference.filter group:consumer
//生产者: key:service.filter group:provider
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;//InvokerDelegate
//如果url中含有service.filter,例如dubbo://192.168.1.7:9090/com.alibaba.service1?server.filter=-defalut,value1 去掉默认的,添加value1(spi文件中的key)(此处的url中没有server.filter)
//@Activate注解修饰的group为provider
//符合上述2个条件之一的filter才会被筛选出来。
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
Result result = filter.invoke(next, invocation);//此处的filter就是buildInvokerChain返回的last
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncResult = (AsyncRpcResult) result;
asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
return asyncResult;
} else {
return filter.onResponse(result, invoker, invocation);
}
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;//MyFilter、MonitorFilter、FutureFilter、ConsumerContextFilter
}
方法执行完,过滤器链构建完成。
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#export
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//启动nettyserver
openServer(url);
optimizeSerialization(url);
return exporter;
}
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#openServer
private void openServer(URL url) {
....
createServer(url)
....
}
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it‘s enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
//实际上调用HeaderExchanger的bind方法。
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
org.apache.dubbo.remoting.exchange.support.header.HeaderExchanger#bind
//handler是org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler的一个匿名实现类。
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
//
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
最终将NettyServer启动提供服务。
标签:ima 协议 ringbuf gets 监控 factory ada ddr boot
原文地址:https://www.cnblogs.com/mwss/p/14458678.html