标签:
查看Jetty-io包,清单如下:
接口类: ByteBufferPool ClientConnectionFactory Connection Connection.Listener Connection.UpgradeFrom Connection.UpgradeTo EndPoint ManagedSelector.SelectableEndPoint NetworkTrafficListener 实体类: AbstractConnection AbstractEndPoint ArrayByteBufferPool ArrayByteBufferPool.Bucket ByteArrayEndPoint ByteBufferPool.Lease ChannelEndPoint ClientConnectionFactory.Helper Connection.Listener.Adapter FillInterest IdleTimeout LeakTrackingByteBufferPool ManagedSelector MappedByteBufferPool MappedByteBufferPool.Tagged NegotiatingClientConnection NegotiatingClientConnectionFactory NetworkTrafficListener.Adapter NetworkTrafficSelectChannelEndPoint SelectChannelEndPoint SelectorManager WriteFlusher WriterOutputStream 异常类: EofException RuntimeIOException
从名字看几个主要的类可能为:Connection、ByteBufferPool、SelectorManager、EndPoint,因为其他类应该是从中延伸出来的。
首先看Connection接口:
public interface Connection extends Closeable { public void addListener(Listener listener); public void onOpen(); /** * <p>Callback method invoked when this {@link Connection} is closed.</p> * <p>Creators of the connection implementation are responsible for calling this method.</p> */ public void onClose(); /** * @return the {@link EndPoint} associated with this {@link Connection} */ public EndPoint getEndPoint(); /** * <p>Performs a logical close of this connection.</p> * <p>For simple connections, this may just mean to delegate the close to the associated * {@link EndPoint} but, for example, SSL connections should write the SSL close message * before closing the associated {@link EndPoint}.</p> */ @Override public void close(); public int getMessagesIn(); public int getMessagesOut(); public long getBytesIn(); public long getBytesOut(); public long getCreatedTimeStamp(); public interface UpgradeFrom extends Connection { /* ------------------------------------------------------------ */ /** Take the input buffer from the connection on upgrade. * <p>This method is used to take any unconsumed input from * a connection during an upgrade. * @return A buffer of unconsumed input. The caller must return the buffer * to the bufferpool when consumed and this connection must not. */ ByteBuffer onUpgradeFrom(); } public interface UpgradeTo extends Connection { /** * <p>Callback method invoked when this {@link Connection} is upgraded.</p> * <p>This must be called before {@link #onOpen()}.</p> * @param prefilled An optional buffer that can contain prefilled data. Typically this * results from an upgrade of one protocol to the other where the old connection has buffered * data destined for the new connection. The new connection must take ownership of the buffer * and is responsible for returning it to the buffer pool */ void onUpgradeTo(ByteBuffer prefilled); } /* ------------------------------------------------------------ */ /** * <p>A Listener for connection events.</p> * <p>Listeners can be added to a {@link Connection} to get open and close events. * The AbstractConnectionFactory implements a pattern where objects implement * this interface that have been added via {@link Container#addBean(Object)} to * the Connector or ConnectionFactory are added as listeners to all new connections * </p> */ public interface Listener { public void onOpened(Connection connection); public void onClosed(Connection connection); public static class Adapter implements Listener { @Override public void onOpened(Connection connection) { } @Override public void onClosed(Connection connection) { } } } }
Connection接口主要用来添加监听,并定义监听接口Listener。
再看一个实现了Connection接口的抽象类AbstractConnection:
public abstract class AbstractConnection implements Connection { private static final Logger LOG = Log.getLogger(AbstractConnection.class); private final List<Listener> listeners = new CopyOnWriteArrayList<>(); private final long _created=System.currentTimeMillis(); private final EndPoint _endPoint; private final Executor _executor; private final Callback _readCallback; private int _inputBufferSize=2048; protected AbstractConnection(EndPoint endp, Executor executor) { if (executor == null) throw new IllegalArgumentException("Executor must not be null!"); _endPoint = endp; _executor = executor; _readCallback = new ReadCallback(); } @Override public void addListener(Listener listener) { listeners.add(listener); } public int getInputBufferSize() { return _inputBufferSize; } public void setInputBufferSize(int inputBufferSize) { _inputBufferSize = inputBufferSize; } protected Executor getExecutor() { return _executor; } @Deprecated public boolean isDispatchIO() { return false; } protected void failedCallback(final Callback callback, final Throwable x) { if (callback.isNonBlocking()) { try { callback.failed(x); } catch (Exception e) { LOG.warn(e); } } else { try { getExecutor().execute(new Runnable() { @Override public void run() { try { callback.failed(x); } catch (Exception e) { LOG.warn(e); } } }); } catch(RejectedExecutionException e) { LOG.debug(e); callback.failed(x); } } } /** * <p>Utility method to be called to register read interest.</p> * <p>After a call to this method, {@link #onFillable()} or {@link #onFillInterestedFailed(Throwable)} * will be called back as appropriate.</p> * @see #onFillable() */ public void fillInterested() { if (LOG.isDebugEnabled()) LOG.debug("fillInterested {}",this); getEndPoint().fillInterested(_readCallback); } public boolean isFillInterested() { return getEndPoint().isFillInterested(); } /** * <p>Callback method invoked when the endpoint is ready to be read.</p> * @see #fillInterested() */ public abstract void onFillable(); /** * <p>Callback method invoked when the endpoint failed to be ready to be read.</p> * @param cause the exception that caused the failure */ protected void onFillInterestedFailed(Throwable cause) { if (LOG.isDebugEnabled()) LOG.debug("{} onFillInterestedFailed {}", this, cause); if (_endPoint.isOpen()) { boolean close = true; if (cause instanceof TimeoutException) close = onReadTimeout(); if (close) { if (_endPoint.isOutputShutdown()) _endPoint.close(); else { _endPoint.shutdownOutput(); fillInterested(); } } } } /** * <p>Callback method invoked when the endpoint failed to be ready to be read after a timeout</p> * @return true to signal that the endpoint must be closed, false to keep the endpoint open */ protected boolean onReadTimeout() { return true; } @Override public void onOpen() { if (LOG.isDebugEnabled()) LOG.debug("onOpen {}", this); for (Listener listener : listeners) listener.onOpened(this); } @Override public void onClose() { if (LOG.isDebugEnabled()) LOG.debug("onClose {}",this); for (Listener listener : listeners) listener.onClosed(this); } @Override public EndPoint getEndPoint() { return _endPoint; } @Override public void close() { getEndPoint().close(); } @Override public int getMessagesIn() { return -1; } @Override public int getMessagesOut() { return -1; } @Override public long getBytesIn() { return -1; } @Override public long getBytesOut() { return -1; } @Override public long getCreatedTimeStamp() { return _created; } @Override public String toString() { return String.format("%s@%x[%s]", getClass().getSimpleName(), hashCode(), _endPoint); } private class ReadCallback implements Callback { @Override public void succeeded() { onFillable(); } @Override public void failed(final Throwable x) { onFillInterestedFailed(x); } @Override public String toString() { return String.format("AC.ReadCB@%x{%s}", AbstractConnection.this.hashCode(),AbstractConnection.this); } } }
标签:
原文地址:http://my.oschina.net/daidetian/blog/522976