标签:
PipedInputStream类与PipedOutputStream类用于在应用程序中创建管道通信.一个PipedInputStream实例对象必须和一个PipedOutputStream实例对象进行连接而产生一个通信管道.PipedOutputStream可以向管道中写入数据,PipedIntputStream可以读取PipedOutputStream向管道中写入的数据.这两个类主要用来完成线程之间的通信.一个线程的PipedInputStream对象能够从另外一个线程的PipedOutputStream对象中读取数据. 
ps:使用这组I/O流必须在多线程环境下. 
        首先简单的介绍一下这两个类的实现原理,PipedInputStream和PipedOutputStream的实现原理类似于"生产者-消费者"原理,PipedOutputStream是生产者,PipedInputStream是消费者,在PipedInputStream中有一个buffer字节数组,默认大小为1024,作为缓冲区,存放"生产者"生产出来的东东.还有两个变量,in,out,in是用来记录"生产者"生产了多少,out是用来记录"消费者"消费了多少,in为-1表示消费完了,in==out表示生产满了.当消费者没东西可消费的时候,也就是当in为-1的时候,消费者会一直等待,直到有东西可消费. 
        因为生产和消费的方法都是synchronized的,所以肯定是生产者先生产出一定数量的东西,消费者才可以开始消费,所以在生产的时候发现in==out,那一定是满了,同理,在消费的时候发现in==out,那一定是消费完了,因为生产的东西永远要比消费来得早,消费者最多可以消费和生产的数量相等的东西,而不会超出. 
        好了,介绍完之后,看看SUN高手是怎么实现这些功能的.由于buffer(存放产品的通道)这个关键变量在PipedInputStream消费者这个类中,所以要想对buffer操作,只能通过PipedInputStream来操作,因此将产品放入通道的操作是在PipedInputStream中. 
存放产品的行为: 
- protected synchronized void receive(int b) throws IOException {
 
-     checkStateForReceive();
 
-     writeSide = Thread.currentThread();
 
-     if (in == out)  
 
-         awaitSpace();
 
-     if (in < 0) {
 
-         in = 0;  
 
-         out = 0;  
 
-     }  
 
-     buffer[in++] = (byte) (b & 0xFF);  
 
-     if (in >= buffer.length) {
 
-         in = 0;  
 
-     }  
 
- }  
 
-   
 
- synchronized void receive(byte b[], int off, int len) throws IOException {
 
-     checkStateForReceive();  
 
-     writeSide = Thread.currentThread();  
 
-     int bytesToTransfer = len;
 
-     while (bytesToTransfer > 0) {  
 
-         if (in == out)  
 
-             awaitSpace();  
 
-         int nextTransferAmount = 0;
 
-         if (out < in) {  
 
-             nextTransferAmount = buffer.length - in;
 
-         } else if (in < out) {  
 
-             if (in == -1) {  
 
-                 in = out = 0;
 
-                 nextTransferAmount = buffer.length - in;  
 
-             } else {  
 
-                 nextTransferAmount = out - in;
 
-             }  
 
-         }  
 
-         if (nextTransferAmount > bytesToTransfer)
 
-             nextTransferAmount = bytesToTransfer;
 
-         assert (nextTransferAmount > 0);  
 
-         System.arraycopy(b, off, buffer, in, nextTransferAmount);
 
-         bytesToTransfer -= nextTransferAmount;
 
-         off += nextTransferAmount;  
 
-         in += nextTransferAmount;  
 
-         if (in >= buffer.length) {
 
-             in = 0;  
 
-         }  
 
-     }  
 
- }  
 
 
消费产品的行为: 
- public synchronized int read() throws IOException {
 
-     if (!connected) {  
 
-         throw new IOException("Pipe not connected");  
 
-     } else if (closedByReader) {  
 
-         throw new IOException("Pipe closed");  
 
-     } else if (writeSide != null && !writeSide.isAlive() && !closedByWriter  
 
-             && (in < 0)) {  
 
-         throw new IOException("Write end dead");  
 
-     }  
 
-   
 
-     readSide = Thread.currentThread();  
 
-     int trials = 2;  
 
-     while (in < 0) {
 
-         if (closedByWriter) {  
 
-             
 
-             return -1;
 
-         }  
 
-         if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {  
 
-             throw new IOException("Pipe broken");  
 
-         }  
 
-         
 
-         notifyAll();  
 
-         try {  
 
-             wait(1000);  
 
-         } catch (InterruptedException ex) {  
 
-             throw new java.io.InterruptedIOException();  
 
-         }  
 
-     }  
 
-     int ret = buffer[out++] & 0xFF;  
 
-     if (out >= buffer.length) {  
 
-         out = 0;
 
-     }  
 
-     if (in == out) {  
 
-         
 
-         in = -1;
 
-     }  
 
-     return ret;  
 
- }  
 
-   
 
- public synchronized int read(byte b[], int off, int len) throws IOException {  
 
-     if (b == null) {  
 
-         throw new NullPointerException();  
 
-     } else if ((off < 0) || (off > b.length) || (len < 0)  
 
-             || ((off + len) > b.length) || ((off + len) < 0)) {  
 
-         throw new IndexOutOfBoundsException();  
 
-     } else if (len == 0) {  
 
-         return 0;  
 
-     }  
 
-   
 
-     
 
-     int c = read();
 
-     if (c < 0) {  
 
-         return -1;
 
-     }  
 
-     b[off] = (byte) c;  
 
-     int rlen = 1;  
 
-   
 
-     
 
-     
 
-     
 
-   
 
-     while ((in >= 0) && (--len > 0)) {  
 
-         b[off + rlen] = buffer[out++];  
 
-         rlen++;  
 
-         if (out >= buffer.length) {  
 
-             out = 0;  
 
-         }  
 
-         if (in == out) {  
 
-             
 
-             in = -1;
 
-         }  
 
-     }  
 
-     return rlen;  
 
- }  
 
 PipedInputStream/PipedOutputStream原理
标签:
原文地址:http://www.cnblogs.com/qiumingcheng/p/5336745.html