package server2.connector; import java.io.IOException; import java.net.InetAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.UnknownHostException; /** * HttpConnector主要负责接入消息,分发给不同的处理器 * * @author Administrator * */ public class HttpConnector implements Runnable{ private boolean shutdown = false;// 服务器是否停止 private String scheme = "http"; public String getScheme() { return scheme; } public void setScheme(String scheme) { this.scheme = scheme; } @Override public void run() { ServerSocket serverSocket = null; int port = 8080; try { serverSocket = new ServerSocket(port, 1, InetAddress.getByName("")); } catch (UnknownHostException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } Socket socket = null; while (!shutdown) {// 不停的接受请求 try { socket = serverSocket.accept(); } catch (IOException e) { e.printStackTrace(); } HttpProcessor processor = new HttpProcessor(); processor.process(socket); } } /** * 启动connector监听线程 */ public void start() { System.out.println("started!!!"); Thread thread = new Thread(this); thread.start(); } }这是一种非常原始的处理方法,一旦接收到请求便会构造出一个处理器,由处理器去处理socket,并且process是一个同步的方法,这里没有用到Processor的池化,也没有使用异步的方式处理socket。这种同步阻塞的方式,connector的处理能力有限,一旦处理过程被阻塞,那么connector便会拒绝下一个请求。
/** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Loop until we receive a shutdown command while (!stopped) { // Accept the next incoming connection from the server socket Socket socket = null; try { // if (debug >= 3) // log("run: Waiting on serverSocket.accept()"); //接收客户端的socket连接 socket = serverSocket.accept(); // if (debug >= 3) // log("run: Returned from serverSocket.accept()"); if (connectionTimeout > 0) socket.setSoTimeout(connectionTimeout); socket.setTcpNoDelay(tcpNoDelay); } catch (AccessControlException ace) { log("socket accept security exception", ace); continue; } catch (IOException e) { // if (debug >= 3) // log("run: Accept returned IOException", e); try { // If reopening fails, exit synchronized (threadSync) { if (started && !stopped) log("accept error: ", e); if (!stopped) { // if (debug >= 3) // log("run: Closing server socket"); serverSocket.close(); // if (debug >= 3) // log("run: Reopening server socket"); serverSocket = open(); } } // if (debug >= 3) // log("run: IOException processing completed"); } catch (IOException ioe) { log("socket reopen, io problem: ", ioe); break; } catch (KeyStoreException kse) { log("socket reopen, keystore problem: ", kse); break; } catch (NoSuchAlgorithmException nsae) { log("socket reopen, keystore algorithm problem: ", nsae); break; } catch (CertificateException ce) { log("socket reopen, certificate problem: ", ce); break; } catch (UnrecoverableKeyException uke) { log("socket reopen, unrecoverable key: ", uke); break; } catch (KeyManagementException kme) { log("socket reopen, key management problem: ", kme); break; } continue; } // Hand this socket off to an appropriate processor //创建processor,如果连接池没有可用的处理器,则拒绝消息,其中的createProcessor方法基于栈来实现的处理池 HttpProcessor processor = createProcessor(); if (processor == null) { try { log(sm.getString("httpConnector.noProcessor")); socket.close(); } catch (IOException e) { ; } continue; } // if (debug >= 3) // log("run: Assigning socket to processor " + processor); //否则的话将socket传递给processor processor.assign(socket); // The processor will recycle itself when it finishes } // Notify the threadStop() method that we have shut ourselves down // if (debug >= 3) // log("run: Notifying threadStop() that we have shut down"); synchronized (threadSync) { threadSync.notifyAll(); } }接收到socket调用processor的assign方法传递给HttpProcessor。处理器的实现也和上面有很大的差别,这里单独实现为一个线程类,先不急看assign方法,看看processor的run方法:
/** * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */ public void run() { // Process requests until we receive a shutdown signal //不停的循环处理消息 while (!stopped) { // Wait for the next socket to be assigned //调用自身的await() Socket socket = await(); //未获取到socket继续循环 if (socket == null) continue; // Process the request from this socket try { //获取到则进行处理 process(socket); } catch (Throwable t) { log("process.invoke", t); } // Finish up this request //将该处理线程放回池中复用 connector.recycle(this); } // Tell threadStop() we have shut ourselves down successfully synchronized (threadSync) { threadSync.notifyAll(); } }await方法:
/** * Await a newly assigned Socket from our Connector, or <code>null</code> * if we are supposed to shut down. 该方法是个同步方法,并且内部可能发生阻塞 */ private synchronized Socket await() { // Wait for the Connector to provide a new Socket //如果没有可处理的socket则同步阻塞,这里connector调用processor的assign方法会传递 //过来可用的socket while (!available) { try { wait(); } catch (InterruptedException e) { } } // Notify the Connector that we have received this Socket //否则可以处理,成员变量赋值给私有变量 Socket socket = this.socket; //设置为没有可处理的socket available = false; //唤醒被阻塞的线程 notifyAll(); if ((debug >= 1) && (socket != null)) log(" The incoming request has been awaited"); return (socket); }
synchronized void assign(Socket socket) { // Wait for the Processor to get the previous Socket //available表示是否有可处理的socket,第一次值为false跳过while循环 while (available) { // try { wait(); } catch (InterruptedException e) { } } // Store the newly available Socket and notify our thread //将socket赋值给成员变量socket this.socket = socket; //修改值为true available = true; //唤醒阻塞在该对象上的线程 notifyAll(); if ((debug >= 1) && (socket != null)) log(" An incoming request is being assigned"); }如果当前线程有可处理的socket则阻塞住connector,否则唤醒阻塞在await上的线程,处理可用的socket,这里的assign方法是异步返回的,只是负责传递socket,处理过程由processor线程完成。connector和processor线程的通讯由available变量和wait,notifyAll方发共同完成。