标签:
HttpClient 是 Apache Jakarta Common 下的子项目,可以用来提供高效的、最新的、功能丰富的支持 HTTP 协议的客户端编程工具包,并且它支持 HTTP 协议最新的版本和建议。HttpClient支持的功能如下:
支持Http0.9、Http1.0和Http1.1协议。
实现了Http全部的方法(GET,POST,PUT,HEAD 等)。
支持HTTPS协议。
支持代理服务器。
提供安全认证方案。
提供连接池以便重用连接。
连接管理器支持多线程应用。支持设置最大连接数,同时支持设置每个主机的最大连接数,发现并关闭过期的连接。
在http1.0和http1.1中利用KeepAlive保持长连接。
本文简单谈下HttpClient源码,而HttpClient源码包有700K,所以这里只会挑重点介绍,详细源码大家可以下载HttpClient的最新的代码来研究。另外:本文所列举的都是 HttpClient4.5 的源码。
HttpClient经常会用到连接池,以便重用tcp连接,特别是在使用长连接时能节约不少性能(避免了三次握手和四次解握)。HttpClient连接池的逻辑是:先根据域名route 来找是否有空闲的连接,如果有就取出来用,如果没有则会创建一个新的连接,并绑定到route 上,这里使用route 来获取连接是为了重用连接,假设一个长连接在同一个域名下多次使用就不用多次握手了。当然,其中还会校验连接池的大小,等待的时间等。源码如下:
private E getPoolEntryBlocking( final T route, final Object state, final long timeout, final TimeUnit tunit, final PoolEntryFuture<E> future) throws IOException, InterruptedException, TimeoutException { Date deadline = null; if (timeout > 0) { deadline = new Date (System.currentTimeMillis() + tunit.toMillis(timeout)); } this.lock.lock(); try { final RouteSpecificPool<T, C, E> pool = getPool(route); E entry = null; while (entry == null) { Asserts.check(!this.isShutDown, "Connection pool shut down"); for (;;) { entry = pool.getFree(state); if (entry == null) { break; } if (entry.isExpired(System.currentTimeMillis())) { entry.close(); } else if (this.validateAfterInactivity > 0) { if (entry.getUpdated() + this.validateAfterInactivity <= System.currentTimeMillis()) { if (!validate(entry)) { entry.close(); } } } if (entry.isClosed()) { this.available.remove(entry); pool.free(entry, false); } else { break; } } if (entry != null) { this.available.remove(entry); this.leased.add(entry); onReuse(entry); return entry; } // New connection is needed final int maxPerRoute = getMax(route); // Shrink the pool prior to allocating a new connection final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute); if (excess > 0) { for (int i = 0; i < excess; i++) { final E lastUsed = pool.getLastUsed(); if (lastUsed == null) { break; } lastUsed.close(); this.available.remove(lastUsed); pool.remove(lastUsed); } } if (pool.getAllocatedCount() < maxPerRoute) { final int totalUsed = this.leased.size(); final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0); if (freeCapacity > 0) { final int totalAvailable = this.available.size(); if (totalAvailable > freeCapacity - 1) { if (!this.available.isEmpty()) { final E lastUsed = this.available.removeLast(); lastUsed.close(); final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute()); otherpool.remove(lastUsed); } } final C conn = this.connFactory.create(route); entry = pool.add(conn); this.leased.add(entry); return entry; } } boolean success = false; try { pool.queue(future); this.pending.add(future); success = future.await(deadline); } finally { // In case of ‘success‘, we were woken up by the // connection pool and should now have a connection // waiting for us, or else we‘re shutting down. // Just continue in the loop, both cases are checked. pool.unqueue(future); this.pending.remove(future); } // check for spurious wakeup vs. timeout if (!success && (deadline != null) && (deadline.getTime() <= System.currentTimeMillis())) { break; } } throw new TimeoutException("Timeout waiting for connection"); } finally { this.lock.unlock(); } }
在发送数据前HttpClient 肯定会和后台的服务(比如restful服务)进行socket连接,建立socket时的参数比如connectTimeout、soTimeout等都可配置。另外,此时会开启套接字的输出流和输入流:输出流用于向后台的restful服务输出Request的参数、header、cookie和body等,该部分源码如下:
@Override public void connect( final ManagedHttpClientConnection conn, final HttpHost host, final InetSocketAddress localAddress, final int connectTimeout, final SocketConfig socketConfig, final HttpContext context) throws IOException { final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(context); final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName()); if (sf == null) { throw new UnsupportedSchemeException(host.getSchemeName() + " protocol is not supported"); } final InetAddress[] addresses = host.getAddress() != null ? new InetAddress[] { host.getAddress() } : this.dnsResolver.resolve(host.getHostName()); final int port = this.schemePortResolver.resolve(host); for (int i = 0; i < addresses.length; i++) { final InetAddress address = addresses[i]; final boolean last = i == addresses.length - 1; Socket sock = sf.createSocket(context); sock.setSoTimeout(socketConfig.getSoTimeout()); sock.setReuseAddress(socketConfig.isSoReuseAddress()); sock.setTcpNoDelay(socketConfig.isTcpNoDelay()); sock.setKeepAlive(socketConfig.isSoKeepAlive()); final int linger = socketConfig.getSoLinger(); if (linger >= 0) { sock.setSoLinger(true, linger); } conn.bind(sock); final InetSocketAddress remoteAddress = new InetSocketAddress(address, port); if (this.log.isDebugEnabled()) { this.log.debug("Connecting to " + remoteAddress); } try { sock = sf.connectSocket( connectTimeout, sock, host, remoteAddress, localAddress, context); conn.bind(sock); if (this.log.isDebugEnabled()) { this.log.debug("Connection established " + conn); } return; } catch (final SocketTimeoutException ex) { if (last) { throw new ConnectTimeoutException(ex, host, addresses); } } catch (final ConnectException ex) { if (last) { final String msg = ex.getMessage(); if ("Connection timed out".equals(msg)) { throw new ConnectTimeoutException(ex, host, addresses); } else { throw new HttpHostConnectException(ex, host, addresses); } } } catch (final NoRouteToHostException ex) { if (last) { throw ex; } } if (this.log.isDebugEnabled()) { this.log.debug("Connect to " + remoteAddress + " timed out. " + "Connection will be retried using another IP address"); } } }
@Override public Socket connectSocket( final int connectTimeout, final Socket socket, final HttpHost host, final InetSocketAddress remoteAddress, final InetSocketAddress localAddress, final HttpContext context) throws IOException { final Socket sock = socket != null ? socket : createSocket(context); if (localAddress != null) { sock.bind(localAddress); } try { sock.connect(remoteAddress, connectTimeout); } catch (final IOException ex) { try { sock.close(); } catch (final IOException ignore) { } throw ex; } return sock; }
建立连接成功后就可以发送数据了,像前面所说的那样:httpclient是使用输出流向后台的restful服务输出Request的参数、header、cookie和body等。源码如下:
public HttpResponse execute( final HttpRequest request, final HttpClientConnection conn, final HttpContext context) throws IOException, HttpException { Args.notNull(request, "HTTP request"); Args.notNull(conn, "Client connection"); Args.notNull(context, "HTTP context"); try { HttpResponse response = doSendRequest(request, conn, context); if (response == null) { response = doReceiveResponse(request, conn, context); } return response; } catch (final IOException ex) { closeConnection(conn); throw ex; } catch (final HttpException ex) { closeConnection(conn); throw ex; } catch (final RuntimeException ex) { closeConnection(conn); throw ex; } }
@Override public void flush() throws IOException { flushBuffer(); flushStream(); } private void flushBuffer() throws IOException { final int len = this.buffer.length(); if (len > 0) { streamWrite(this.buffer.buffer(), 0, len); this.buffer.clear(); this.metrics.incrementBytesTransferred(len); } } private void flushStream() throws IOException { if (this.outstream != null) { this.outstream.flush(); } }
发送完数据后就会接受套接字输入流的数据,上面源码块里面的 response = doReceiveResponse(request, conn, context)就是用于接受套接字数据。源码如下:
protected HttpResponse doReceiveResponse( final HttpRequest request, final HttpClientConnection conn, final HttpContext context) throws HttpException, IOException { Args.notNull(request, "HTTP request"); Args.notNull(conn, "Client connection"); Args.notNull(context, "HTTP context"); HttpResponse response = null; int statusCode = 0; while (response == null || statusCode < HttpStatus.SC_OK) { response = conn.receiveResponseHeader(); if (canResponseHaveBody(request, response)) { conn.receiveResponseEntity(response); } statusCode = response.getStatusLine().getStatusCode(); } // while intermediate response return response; }
其中:conn.receiveResponseHeader() 用于获取Response的header数据,而conn.receiveResponseEntity(response)或获取Response的body数据。当然,在获取body数据前会先判断Response的状态码是否合法,源码如下:
protected boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) { if ("HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) { return false; } final int status = response.getStatusLine().getStatusCode(); return status >= HttpStatus.SC_OK && status != HttpStatus.SC_NO_CONTENT && status != HttpStatus.SC_NOT_MODIFIED && status != HttpStatus.SC_RESET_CONTENT; }
获取Response数据后会处理这些数据,比如保持连接之类的设置,Httpclient会根据response返回的协议以及header里的Connect等参数来设置是否保持连接,源码如下:
if (reuseStrategy.keepAlive(response, context)) { // Set the idle duration of this connection final long duration = keepAliveStrategy.getKeepAliveDuration(response, context); if (this.log.isDebugEnabled()) { final String s; if (duration > 0) { s = "for " + duration + " " + TimeUnit.MILLISECONDS; } else { s = "indefinitely"; } this.log.debug("Connection can be kept alive " + s); } connHolder.setValidFor(duration, TimeUnit.MILLISECONDS); connHolder.markReusable(); } else { connHolder.markNonReusable(); }
@Override public boolean keepAlive(final HttpResponse response, final HttpContext context) { Args.notNull(response, "HTTP response"); Args.notNull(context, "HTTP context"); // Check for a self-terminating entity. If the end of the entity will // be indicated by closing the connection, there is no keep-alive. final ProtocolVersion ver = response.getStatusLine().getProtocolVersion(); final Header teh = response.getFirstHeader(HTTP.TRANSFER_ENCODING); if (teh != null) { if (!HTTP.CHUNK_CODING.equalsIgnoreCase(teh.getValue())) { return false; } } else { if (canResponseHaveBody(response)) { final Header[] clhs = response.getHeaders(HTTP.CONTENT_LEN); // Do not reuse if not properly content-length delimited if (clhs.length == 1) { final Header clh = clhs[0]; try { final int contentLen = Integer.parseInt(clh.getValue()); if (contentLen < 0) { return false; } } catch (final NumberFormatException ex) { return false; } } else { return false; } } } // Check for the "Connection" header. If that is absent, check for // the "Proxy-Connection" header. The latter is an unspecified and // broken but unfortunately common extension of HTTP. HeaderIterator hit = response.headerIterator(HTTP.CONN_DIRECTIVE); if (!hit.hasNext()) { hit = response.headerIterator("Proxy-Connection"); } if (hit.hasNext()) { try { final TokenIterator ti = createTokenIterator(hit); boolean keepalive = false; while (ti.hasNext()) { final String token = ti.nextToken(); if (HTTP.CONN_CLOSE.equalsIgnoreCase(token)) { return false; } else if (HTTP.CONN_KEEP_ALIVE.equalsIgnoreCase(token)) { // continue the loop, there may be a "close" afterwards keepalive = true; } } if (keepalive){ return true; // neither "close" nor "keep-alive", use default policy } } catch (final ParseException px) { // invalid connection header means no persistent connection // we don‘t have logging in HttpCore, so the exception is lost return false; } } // default since HTTP/1.1 is persistent, before it was non-persistent return !ver.lessEquals(HttpVersion.HTTP_1_0); }
然后如果Response返回302,时httpclient会根据策略来判断并决定是否需要重定向,源码如下:
for (int redirectCount = 0;;) { final CloseableHttpResponse response = requestExecutor.execute( currentRoute, currentRequest, context, execAware); try { if (config.isRedirectsEnabled() && this.redirectStrategy.isRedirected(currentRequest, response, context)) { if (redirectCount >= maxRedirects) { throw new RedirectException("Maximum redirects ("+ maxRedirects + ") exceeded"); } redirectCount++; final HttpRequest redirect = this.redirectStrategy.getRedirect( currentRequest, response, context); if (!redirect.headerIterator().hasNext()) { final HttpRequest original = request.getOriginal(); redirect.setHeaders(original.getAllHeaders()); } currentRequest = HttpRequestWrapper.wrap(redirect); if (currentRequest instanceof HttpEntityEnclosingRequest) { RequestEntityProxy.enhance((HttpEntityEnclosingRequest) currentRequest); } final URI uri = currentRequest.getURI(); final HttpHost newTarget = URIUtils.extractHost(uri); if (newTarget == null) { throw new ProtocolException("Redirect URI does not specify a valid host name: " + uri); } // Reset virtual host and auth states if redirecting to another host if (!currentRoute.getTargetHost().equals(newTarget)) { final AuthState targetAuthState = context.getTargetAuthState(); if (targetAuthState != null) { this.log.debug("Resetting target auth state"); targetAuthState.reset(); } final AuthState proxyAuthState = context.getProxyAuthState(); if (proxyAuthState != null) { final AuthScheme authScheme = proxyAuthState.getAuthScheme(); if (authScheme != null && authScheme.isConnectionBased()) { this.log.debug("Resetting proxy auth state"); proxyAuthState.reset(); } } } currentRoute = this.routePlanner.determineRoute(newTarget, currentRequest, context); if (this.log.isDebugEnabled()) { this.log.debug("Redirecting to ‘" + uri + "‘ via " + currentRoute); } EntityUtils.consume(response.getEntity()); response.close(); } else { return response; } } catch (final RuntimeException ex) { response.close(); throw ex; } catch (final IOException ex) { response.close(); throw ex; } catch (final HttpException ex) { // Protocol exception related to a direct. // The underlying connection may still be salvaged. try { EntityUtils.consume(response.getEntity()); } catch (final IOException ioex) { this.log.debug("I/O error while releasing connection", ioex); } finally { response.close(); } throw ex; } }
可以看到,首先获取Response,然后根据config.isRedirectsEnabled() 和this.redirectStrategy.isRedirected(currentRequest, response, context)的结果来进行重定向,都为true则重定向,否则直接返回Response。当然了,这里的redirectsEnabled和redirectStrategy都是可以自定义的。
HttpClient 是Apache 下又一个十分优秀的开源框架,最常用用于封装长连接、调用后台的resetful接口等。然而它还有很多本文没有提到的功能,比如安全认证、使用HTTPS协议等。大家感兴趣的话可以更深入地了解下。
标签:
原文地址:http://my.oschina.net/xionghui/blog/506320