标签:dubbo
本节内容主要了解下DUBBO服务端的暴露服务过程。
吐下槽,这个Exporter命名个人感觉不好。
Export,在DUBBO中既代表暴露过程,这儿是动作,对应export方法。
同时,Exporter接口,却代表着暴露的信息,比如URL。 总之,在研究过程中,对自己的理解代理干扰。
首先分享下DUBBO官网的暴露服务时序图
吐下槽,虽然该图来自官网,但有些信息和实际代码有些出入。
不管怎样,从整体上来看,可以帮助我们了解整体过程。个人也花了些时间,对该图细化下。两图可以互相参照着看。下图,是个人花时间整理的。
该图,比官网提供的图描述更细,当然也只是把主体调用过程进行了罗列。个人按习惯分为三部分。
section 1: 生成代理,构造Invoker对象;
section 2: Protocol调用链层,实现功能动态扩展
section 3: 负责socket端口绑定,借助底层netty,mina框架实现具体网络交互工作。
接下来,从section 2开发剖析。
2.Protocol协议
Protocol是将Invoker暴露的入口。
2.1 层次结构图
2.2 Protocol API
Protocol被@SPI注解,这说明该接口是动态适配的。
@SPI("dubbo")
public interface Protocol {
/**
* 获取缺省端口,当用户没有配置端口时使用。
*
* @return 缺省端口
*/
int getDefaultPort();
/**
* 暴露远程服务:<br>
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* 引用远程服务:<br>
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* 释放协议:<br>
*/
void destroy();
}filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper registry=com.alibaba.dubbo.registry.integration.RegistryProtocol injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol # com.alibaba.dubbo.rpc.protocol.http.HttpProtocol?? thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol # com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol memcached=memcom.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
简单把Protocol实现分为2大类。
filter,listener,register 采用代理模式,主要实现Protocol的扩展,调用具体protocal实现类完成。
其他,如dubbo,injvm,hession等,负责具体通讯工作。
3. 过程分析
3.1 测试代码(来自测试类DubboProtocolTest)
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
ProxyFactory proxy = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
DemoService service = new DemoServiceImpl();
protocol.export(proxy.getInvoker(service, DemoService.class, URL.valueOf("dubbo://127.0.0.1:9010/" + DemoService.class.getName())));3.2 Protocol选择
由ExtensionLoader负责完成。具体代码片段
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
最后的protocol是个包装类。如下图
至于具体解析过程,参见《Dubbo原理解析-Dubbo内核实现之基于SPI思想Dubbo内核实现 》
3.3 ProtocolFilterWrapper VS ProtocolListenerWrapper
通过图,可以知道
Listener,Filter ,DUBBO提供了动态扩展。
ProtocolFilterWrapper 和 ProtocolListenerWrapper都是使用代理模式,实现功能扩展。但两者从使用场景上有所差别。
ProtocolFilterWrapper :主要通过构造调用链,实现功能扩展。提供了丰富的Filter接口。
ProtocolListenerWrapper:通过对服务的暴露和下线进行消息通知,DUBBO提供Listener接口不多。
下面简单分享下调用时机
ListenerExporterWrapper.java
public class ListenerExporterWrapper<T> implements Exporter<T> {
private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);
private final Exporter<T> exporter;
private final List<ExporterListener> listeners;
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners){
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (listeners != null && listeners.size() > 0) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} finally {
if (listeners != null && listeners.size() > 0) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.unexported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
}Listener的调用,发生在export,unexport执行后进行
ProtocolFilterWrapper.java
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol){
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
public int getDefaultPort() {
return protocol.getDefaultPort();
}
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
public void destroy() {
protocol.destroy();
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i --) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
public Class<T> getInterface() {
return invoker.getInterface();
}
public URL getUrl() {
return invoker.getUrl();
}
public boolean isAvailable() {
return invoker.isAvailable();
}
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
}以CacheFilter.java为例
@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {
private CacheFactory cacheFactory;
public void setCacheFactory(CacheFactory cacheFactory) {
this.cacheFactory = cacheFactory;
}
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
Cache cache = cacheFactory.getCache(invoker.getUrl().addParameter(Constants.METHOD_KEY, invocation.getMethodName()));
if (cache != null) {
String key = StringUtils.toArgumentString(invocation.getArguments());
if (cache != null && key != null) {
Object value = cache.get(key);
if (value != null) {
return new RpcResult(value);
}
Result result = invoker.invoke(invocation);
if (! result.hasException()) {
cache.put(key, result.getValue());
}
return result;
}
}
}
return invoker.invoke(invocation);
}
}可以看出,Filter会在包裹着服务调用。
最后总结下两者区别
| 区别 | ProtocolFilterWrapper | ProtocolListenerWrapper |
| 内部实现类 | 多 | 少 |
| 调用时机 | 服务具体调用时 | 服务暴露,服务下线时 |
| 使用场景 | RPC调用时,动态扩展 | 消息通知 |
| 上下文 | 类似Around Advise | 类似After Advise |
| 组织形式 | 调用链 | 循环遍历 |
4. 服务暴露过程主体类图
DubboProtocol:具体协议实现类,服务的暴露,调用入口
Exchangers:ExchangeServer,ExchangeClient工厂类
HeaderExchanger:Exchanger意为“交换”的意思,表示数据交换接口,连接上下两侧。
HeaderExchangeServer:提供分布式服务的服务器Exchange接口
NettyTransporter:提供网络基本接口(bind,connection)
NettyServer:借助第三方netty,对外提供端口。
本文出自 “简单” 博客,请务必保留此出处http://dba10g.blog.51cto.com/764602/1885295
标签:dubbo
原文地址:http://dba10g.blog.51cto.com/764602/1885295