以SOAP协议服务为例,当我们使用代码调用Web服务时,一般情况如下:
JaxWsProxyFactoryBean factoryBean = new JaxWsProxyFactoryBean(); String address = "...";//Web服务发布地址 factoryBean.setAddress(address); factoryBean.setServiceClass(HelloService.class);//Web服务接口 HelloService helloService = factoryBean.create(HelloService.class); helloService.sayHello("xtayfjpk");
public <ProxyServiceType> ProxyServiceType create(Class<ProxyServiceType> serviceClass) { setServiceClass(serviceClass); //看上去很简单啊,别急,操作都是create()方法中呢 return serviceClass.cast(create()); }
public synchronized Object create() { ClassLoaderHolder orig = null; try { if (getBus() != null) { ClassLoader loader = getBus().getExtension(ClassLoader.class); if (loader != null) { orig = ClassLoaderUtils.setThreadContextClassloader(loader); } } //配置this,即JaxWsProxyFactoryBean对象 configureObject(); if (properties == null) { properties = new HashMap<String, Object>(); } if (username != null) { AuthorizationPolicy authPolicy = new AuthorizationPolicy(); authPolicy.setUserName(username); authPolicy.setPassword(password); properties.put(AuthorizationPolicy.class.getName(), authPolicy); } //为clientFactoryBean与ServiceFactory设置features initFeatures(); clientFactoryBean.setProperties(properties); if (bus != null) { clientFactoryBean.setBus(bus); } if (dataBinding != null) { clientFactoryBean.setDataBinding(dataBinding); } //由工厂类创建出Client对象,实现类为ClientImpl,并且创建出了Endpoint、Service对象 Client c = clientFactoryBean.create(); //将各种拦截器设置进Client中 if (getInInterceptors() != null) { c.getInInterceptors().addAll(getInInterceptors()); } if (getOutInterceptors() != null) { c.getOutInterceptors().addAll(getOutInterceptors()); } if (getInFaultInterceptors() != null) { c.getInFaultInterceptors().addAll(getInFaultInterceptors()); } if (getOutFaultInterceptors() != null) { c.getOutFaultInterceptors().addAll(getOutFaultInterceptors()); } //创建客户端代理对象 ClientProxy handler = clientClientProxy(c); //获取代理需要实现的接口,包含Web服务接口与java.io.Closeable Class<?> classes[] = getImplementingClasses(); //这里是最关键代码,使用JDK提供的Proxy类创建出一个代理对象,该代理对象实现了Web服务接口与java.io.Closeable接口 //而调用处理器(InvocationHandler)对象就是刚创建的ClientProxy对象 Object obj = Proxy.newProxyInstance(clientFactoryBean.getServiceClass().getClassLoader(), classes, handler); this.getServiceFactory().sendEvent(FactoryBeanListener.Event.PROXY_CREATED, classes, handler, obj); return obj; } finally { if (orig != null) { orig.reset(); } } }
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { //省略... MethodDispatcher dispatcher = (MethodDispatcher)endpoint.getService().get(MethodDispatcher.class .getName()); BindingOperationInfo oi = dispatcher.getBindingOperation(method, endpoint); //省略... Object[] params = args; if (null == params) { params = new Object[0]; } //进行同步调用 Object o = invokeSync(method, oi, params); //call a virtual method passing the object. This causes the IBM JDK //to keep the "this" pointer references and thus "this" doesn't get //finalized in the midst of an invoke operation return adjustObject(o); }
private Object[] doInvoke(ClientCallback callback, BindingOperationInfo oi, Object[] params, Map<String, Object> context, Exchange exchange) throws Exception { Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus); ClassLoaderHolder origLoader = null; try { ClassLoader loader = bus.getExtension(ClassLoader.class); if (loader != null) { origLoader = ClassLoaderUtils.setThreadContextClassloader(loader); } if (exchange == null) { //创建Exchange对象 exchange = new ExchangeImpl(); } exchange.setSynchronous(callback == null); Endpoint endpoint = getEndpoint(); if (LOG.isLoggable(Level.FINE)) { LOG.fine("Invoke, operation info: " + oi + ", params: " + Arrays.toString(params)); } //创建Out消息 Message message = endpoint.getBinding().createMessage(); // Make sure INVOCATION CONTEXT, REQUEST_CONTEXT and RESPONSE_CONTEXT are present // on message //将调用上下文、请求上下文、响应上下文三个Map放置在Message与Exchange相应位置 Map<String, Object> reqContext = null; Map<String, Object> resContext = null; if (context == null) { context = new HashMap<String, Object>(); } reqContext = CastUtils.cast((Map<?, ?>)context.get(REQUEST_CONTEXT)); resContext = CastUtils.cast((Map<?, ?>)context.get(RESPONSE_CONTEXT)); if (reqContext == null) { reqContext = new HashMap<String, Object>(getRequestContext()); context.put(REQUEST_CONTEXT, reqContext); } if (resContext == null) { resContext = new HashMap<String, Object>(); context.put(RESPONSE_CONTEXT, resContext); } message.put(Message.INVOCATION_CONTEXT, context); setContext(reqContext, message); exchange.putAll(reqContext); //设置参数,将参数包装成一个MessageContentsList对象 //CXF中,Web服务的参数与返回结果都会包装成这种数据类型 setParameters(params, message); if (null != oi) { //设置Exchange是否为单向的 exchange.setOneWay(oi.getOutput() == null); } //将Message作为Exchange的输出消息 exchange.setOutMessage(message); //设置回调接口,一般情况为null exchange.put(ClientCallback.class, callback); //设置消息属性 setOutMessageProperties(message, oi); //设置Exchange属性,设置的属性有很多,具体的要看源码,其中一个最重要的属性是MessageObserver对象 //exchange.put(MessageObserver.class, this);即把ClientImpl作为Message观察者,也就是当Web服务响应 //结果返回后将调用ClientImpl的onMessage方法 setExchangeProperties(exchange, endpoint, oi); //收集Bus、Client、Endpoint、Binding、DataBinding的输出拦截器至拦截器链中 PhaseInterceptorChain chain = setupInterceptorChain(endpoint); //将拦截器链设置进Message中 message.setInterceptorChain(chain); //为拦截器链设置错误观察者,即处理器 chain.setFaultObserver(outFaultObserver); //准备ConduitSelector对象,其内部维护了一个Conduit列表,如果你在JaxWsProxyFactoryBean没有设置ConduitSelector //则默认会使用org.apache.cxf.endpoint.UpfrontConduitSelector,ConduitSelector会根据Message的设置情况选取或创建一个Conduit //Conduit对象是CXF中传输层用作客户端与服务端之间传输数据的管道,封装也客户端与服务端之间数据通信协议 //对于JaxWsProxyFactoryBean调用Web服务,使用的是org.apache.cxf.transport.http.URLConnectionHTTPConduit,是一种基于 //java.net.URLConnection的管道,客户端与服务端之间的数据通信最终都是通过URLConnection对象来完成的。 prepareConduitSelector(message); //添加一些额外的拦截器,根据消息中是否含有拦截器提供者 modifyChain(chain, message, false); try { //调用拦截器链doIntercept()方法,依赖调用链中各个拦截器中的handleMessage()方法 chain.doIntercept(message); } catch (Fault fault) { enrichFault(fault); throw fault; } if (callback != null) { return null; } else { //处理执行结果 return processResult(message, exchange, oi, resContext); } } finally { if (origLoader != null) { origLoader.reset(); } if (origBus != bus) { BusFactory.setThreadDefaultBus(origBus); } } }
上面的序号也是拦截器被执行的顺序,其中第7、9、12、16号拦截器非常重要,这里将作详细解释,由于拦截器太多,其它的就得自己看了。
org.apache.cxf.interceptor.MessageSenderInterceptor:
public void handleMessage(Message message) { try { getConduit(message).prepare(message); } catch (IOException ex) { throw new Fault(new org.apache.cxf.common.i18n.Message("COULD_NOT_SEND", BUNDLE), ex); } //添加org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor到链中 message.getInterceptorChain().add(ending); }
public void prepare(Message message) throws IOException { // This call can possibly change the conduit endpoint address and // protocol from the default set in EndpointInfo that is associated // with the Conduit. URI currentURI; try { //获取要访问的URI currentURI = setupURI(message); } catch (URISyntaxException e) { throw new IOException(e); } // The need to cache the request is off by default boolean needToCacheRequest = false; HTTPClientPolicy csPolicy = getClient(message); //获取并配置Connection,该方法为抽象方法,实现在URLConnectionHTTPConduit中 setupConnection(message, currentURI, csPolicy); //获取请求方法,GET或POST,默认使用POST String httpRequestMethod = (String)message.get(Message.HTTP_REQUEST_METHOD); if (httpRequestMethod == null) { httpRequestMethod = "POST"; message.put(Message.HTTP_REQUEST_METHOD, "POST"); } //省略... //创建OutputStream对象,用于向服务端发送请求,并将其放置在Message中 //createOutputStream也是一抽象方法,实现在URLConnectionHTTPConduit中 message.setContent(OutputStream.class, createOutputStream(message, needToCacheRequest, isChunking, chunkThreshold)); //至此,所有准备工作都已经OK,可以发送消息了 }
protected void setupConnection(Message message, URI currentURL, HTTPClientPolicy csPolicy) throws IOException { //创建出HttpURLConnection HttpURLConnection connection = createConnection(message, currentURL, csPolicy); //设置输出标志 connection.setDoOutput(true); //计算连接超时时间 int ctimeout = determineConnectionTimeout(message, csPolicy); connection.setConnectTimeout(ctimeout); //计算读取超时时间 int rtimeout = determineReceiveTimeout(message, csPolicy); connection.setReadTimeout(rtimeout); //不使用缓存 connection.setUseCaches(false); //因为URLConnectionHTTPConduit自己实现了重定向功能,所以不再使用HttpURLConnection的重定向 connection.setInstanceFollowRedirects(false); // 如果HTTP_REQUEST_METHOD没有设置,则默认为POST String httpRequestMethod = (String)message.get(Message.HTTP_REQUEST_METHOD); if (httpRequestMethod == null) { httpRequestMethod = "POST"; message.put(Message.HTTP_REQUEST_METHOD, "POST"); } //设置请求方法 connection.setRequestMethod(httpRequestMethod); //将HttpURLConnection放置在Message中,给创建输出流的时候使用 message.put(KEY_HTTP_CONNECTION, connection); } protected OutputStream createOutputStream(Message message, boolean needToCacheRequest, boolean isChunking, int chunkThreshold) throws IOException { //取出在setupConnection方法中放置在Message中的HttpURLConnection对象 HttpURLConnection connection = (HttpURLConnection)message.get(KEY_HTTP_CONNECTION); if (isChunking && chunkThreshold <= 0) { chunkThreshold = 0; connection.setChunkedStreamingMode(-1); } try { //返回一个包装好的输出流 return new URLConnectionWrappedOutputStream(message, connection, needToCacheRequest, isChunking, chunkThreshold, getConduitName()); } catch (URISyntaxException e) { throw new IOException(e); } }
org.apache.cxf.interceptor.StaxOutInterceptor:
public void handleMessage(Message message) { OutputStream os = message.getContent(OutputStream.class); XMLStreamWriter xwriter = message.getContent(XMLStreamWriter.class); Writer writer = null; if (os == null) { writer = message.getContent(Writer.class); } if ((os == null && writer == null) || xwriter != null) { return; } String encoding = getEncoding(message); try { XMLOutputFactory factory = getXMLOutputFactory(message); if (factory == null) { if (writer == null) { os = setupOutputStream(message, os); xwriter = StaxUtils.createXMLStreamWriter(os, encoding); } else { xwriter = StaxUtils.createXMLStreamWriter(writer); } } else { synchronized (factory) { if (writer == null) { os = setupOutputStream(message, os); xwriter = factory.createXMLStreamWriter(os, encoding); } else { xwriter = factory.createXMLStreamWriter(writer); } } } if (MessageUtils.getContextualBoolean(message, FORCE_START_DOCUMENT, false)) { xwriter.writeStartDocument(encoding, "1.0"); message.removeContent(OutputStream.class); message.put(OUTPUT_STREAM_HOLDER, os); message.removeContent(Writer.class); message.put(WRITER_HOLDER, writer); } } catch (XMLStreamException e) { throw new Fault(new org.apache.cxf.common.i18n.Message("STREAM_CREATE_EXC", BUNDLE), e); } //将XMLStreamWriter设置进Message中 message.setContent(XMLStreamWriter.class, xwriter); // Add a final interceptor to write end elements message.getInterceptorChain().add(ENDING); }
org.apache.cxf.interceptor.BareOutInterceptor:
public void handleMessage(Message message) { Exchange exchange = message.getExchange(); BindingOperationInfo operation = (BindingOperationInfo)exchange.get(BindingOperationInfo.class.getName()); if (operation == null) { return; } //获取参数列表 MessageContentsList objs = MessageContentsList.getContentsList(message); if (objs == null || objs.size() == 0) { return; } List<MessagePartInfo> parts = null; BindingMessageInfo bmsg = null; //现在是客户端,则为true boolean client = isRequestor(message); if (!client) { if (operation.getOutput() != null) { bmsg = operation.getOutput(); parts = bmsg.getMessageParts(); } else { // partial response to oneway return; } } else { bmsg = operation.getInput(); //获取消息内容 parts = bmsg.getMessageParts(); } //将消息内容通过XMLStreamWriter以XML格式写入输出流,最终通过URLConnection写给服务端 writeParts(message, exchange, operation, objs, parts); }
org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor,该拦截器是在执行MessageSenderInterceptor时添加到拦截器链中的。
public void handleMessage(Message message) throws Fault { try { getConduit(message).close(message); } catch (IOException e) { throw new Fault(new org.apache.cxf.common.i18n.Message("COULD_NOT_SEND", BUNDLE), e); } }
protected void handleResponseInternal() throws IOException { //获取Exchage对象 Exchange exchange = outMessage.getExchange(); //获取响应状态码 int responseCode = getResponseCode(); if (responseCode == -1) { LOG.warning("HTTP Response code appears to be corrupted"); } if (exchange != null) { exchange.put(Message.RESPONSE_CODE, responseCode); } //处理异常 boolean noExceptions = MessageUtils.isTrue(outMessage.getContextualProperty( "org.apache.cxf.http.no_io_exceptions")); if (responseCode >= 400 && responseCode != 500 && !noExceptions) { throw new HTTPException(responseCode, getResponseMessage(), url.toURL()); } InputStream in = null; //创建in message,即响应消息,因为请求已经提交,接下来就是要接收服务端的响应消息 Message inMessage = new MessageImpl(); inMessage.setExchange(exchange); //更新响应头 updateResponseHeaders(inMessage); inMessage.put(Message.RESPONSE_CODE, responseCode); if (isOneway(exchange) || HttpURLConnection.HTTP_ACCEPTED == responseCode) { //省略... 因为Exchange不是单向的 } else { //not going to be resending or anything, clear out the stuff in the out message //to free memory //将输出流移出outMessage,释放输出消息,输出消息的工作到此完成 outMessage.removeContent(OutputStream.class); if (cachingForRetransmission && cachedStream != null) { cachedStream.close(); } cachedStream = null; } //获取编码 String charset = HttpHeaderHelper.findCharset((String)inMessage.get(Message.CONTENT_TYPE)); String normalizedEncoding = HttpHeaderHelper.mapCharset(charset); if (normalizedEncoding == null) { String m = new org.apache.cxf.common.i18n.Message("INVALID_ENCODING_MSG", LOG, charset).toString(); LOG.log(Level.WARNING, m); throw new IOException(m); } inMessage.put(Message.ENCODING, normalizedEncoding); if (in == null) { //获取输入流,实现在URLConnectionWrappedOutputStream中,由URLConnection返回一个InputStream对象 in = getInputStream(); } if (in == null) { // Create an empty stream to avoid NullPointerExceptions in = new ByteArrayInputStream(new byte[] {}); } //将输出流放置在输出消息中,以供后用 inMessage.setContent(InputStream.class, in); //调用输入消息观察者的onMessage方法(观察者模式的使用) incomingObserver.onMessage(inMessage); }
public void onMessage(Message message) { Endpoint endpoint = message.getExchange().getEndpoint(); //省略... //将MessageImpl包装成SOAPMessage message = endpoint.getBinding().createMessage(message); message.getExchange().setInMessage(message); message.put(Message.REQUESTOR_ROLE, Boolean.TRUE); message.put(Message.INBOUND_MESSAGE, Boolean.TRUE); //省略... InterceptorChain chain; //与输出拦截器链类似,收集Bus、Client、Endpoint、Binding、DataBinding的输入拦截器 message.setInterceptorChain(chain); chain.setFaultObserver(outFaultObserver); modifyChain(chain, message, true); modifyChain(chain, message.getExchange().getOutMessage(), true); Bus origBus = BusFactory.getAndSetThreadDefaultBus(bus); // execute chain ClientCallback callback = message.getExchange().get(ClientCallback.class); try { //省略很多代码... if(callback!=null) { //do something with callback } //调用输入拦截器链的doIntercept,依次调用链中的各个拦截器的handleMessage方法 chain.doIntercept(message); } finally { //省略... } }
protected Object[] processResult(Message message, Exchange exchange, BindingOperationInfo oi, Map<String, Object> resContext) throws Exception { //异常处理... //处理202响应 Integer responseCode = (Integer)exchange.get(Message.RESPONSE_CODE); if (null != responseCode && 202 == responseCode) { Endpoint ep = exchange.getEndpoint(); if (null != ep && null != ep.getEndpointInfo() && null == ep.getEndpointInfo(). getProperty("org.apache.cxf.ws.addressing.MAPAggregator.decoupledDestination")) { return null; } } // Wait for a response if we need to if (oi != null && !oi.getOperationInfo().isOneWay()) { synchronized (exchange) { //等待服务端响应 waitResponse(exchange); } } //是否保存输出入为打开状态,默认为否 Boolean keepConduitAlive = (Boolean)exchange.get(Client.KEEP_CONDUIT_ALIVE); if (keepConduitAlive == null || !keepConduitAlive) { //关闭输入流 getConduitSelector().complete(exchange); } // Grab the response objects if there are any List<Object> resList = null; Message inMsg = exchange.getInMessage(); if (inMsg != null) { if (null != resContext) { resContext.putAll(inMsg); // remove the recursive reference if present resContext.remove(Message.INVOCATION_CONTEXT); responseContext.put(Thread.currentThread(), resContext); } //获取消息中的List类型对象,也就是MessageConentList resList = CastUtils.cast(inMsg.getContent(List.class)); } //处理异常... if (resList != null) { //返回Object[] return resList.toArray(); } return null; }
public Object invokeSync(Method method, BindingOperationInfo oi, Object[] params) throws Exception { if (client == null) { throw new IllegalStateException("The client has been closed."); } //client实现类就是ClientImpl Object rawRet[] = client.invoke(oi, params); if (rawRet != null && rawRet.length > 0) { //取返回结果Object[]中的第一个元素,得到最终的返回结果 return rawRet[0]; } else { return null; } }
原文地址:http://blog.csdn.net/xtayfjpk/article/details/45221055