码迷,mamicode.com
首页 > 其他好文 > 详细

TIJ -- 任务间使用管道进行输入/输出

时间:2017-11-26 19:39:24      阅读:180      评论:0      收藏:0      [点我收藏+]

标签:自动   queue   static   sleep   buffer   send   一段   nbsp   ack   

  1. 通过输入/输出在线程间进行通信通常很有用。提供线程功能的类库以“管道”的形式对线程间的输入/输出提供了支持。它们在Java输入/输出类库中的对应物就是PipedWriter类(允许任务向管道写)和PipedReader类(允许不同任务从同一个管道中读取)。这个模型可以看成是“生产者 - 消费者”问题的变体,这里的管道就是一个封装好的解决方案。管道基本上是一个阻塞队列,存在于多个引入BlockingQueue之前的Java版本中。

  2. 下面是一个简单例子,两个任务使用一个管道进行通信:

    Class : 

package lime.thinkingInJava._021._005._005;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @Author : Lime
 * @Description :
 * @Remark :
 */
class Sender implements Runnable{
    private Random rand = new Random(47);
    private PipedWriter out = new PipedWriter();
    public PipedWriter getPipedWriter(){
        return out;
    }
    public void run(){
        try{
            while (true){
                for(char c = ‘A‘; c <= ‘z‘;c++){
                    out.write(c);
                    TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));
                }
            }
        } catch (InterruptedException e) {
            System.out.println(e + " Sender sleep Interrupted");
        } catch (IOException e) {
            System.out.println(e + " Sender write exception");
        }
    }
}
class Receiver implements Runnable{
    private PipedReader in;
    public Receiver(Sender sender) throws IOException {
        in = new PipedReader(sender.getPipedWriter());
    }
    public void run(){
        try{
            while (true){
                //Blocks until characters are there;
                System.out.println("Read : " + (char)in.read());
            }
        } catch (IOException e) {
            System.out.println(e + " Receiver read exception");
        }
    }
}
public class PipedIO {
    public static void main(String[] args) throws IOException, InterruptedException {
        Sender sender = new Sender();
        Receiver receiver = new Receiver(sender);
        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(sender);
        exec.execute(receiver);
        TimeUnit.SECONDS.sleep(4);
        exec.shutdownNow();
    }
}

  3. Console : 

Read : A
Read : B
Read : C
Read : D
Read : E
Read : F
Read : G
Read : H
Read : I
Read : J
Read : K
Read : L
Read : M
Read : N
Read : O
Read : P
Read : Q
java.lang.InterruptedException: sleep interrupted Sender sleep Interrupted
java.io.InterruptedIOException Receiver read exception

  4. Sender和Receiver代表了需要互相通信两个任务。Sender创建了一个PipedWriter,它是一个单独的对象;但是对于Receiver,PipedReader的建立必须在构造器中与一个PipedWriter相关联。Sender把数据放进Writer,然后休眠一段时间(随机数)。然而,Receiver没有Sleep()和wait()。但当它调用read()时,如果没有更多的数据,管道将自动阻塞。

  注意sender和receiver是在main()中启动的,即对象构造彻底完毕以后。如果你启动了一个没有构造完毕的对象,在不同的平台上管道可能会产生不一致的行为(注意,BlockingQueue使用起来更加健壮而容易)。

  在shutdownNow()被调用时,可以看到PipedReader与普通I/O之间最重要的差异 ------ PipedReader是可中断的。如果你将in.read()调用修改为System.in.read(),那么interrupt()将不能打断read()调用。

  5. PipedWriter的wirte() 源码解析

    /**
     * Writes the specified <code>char</code> to the piped output stream.
     * If a thread was reading data characters from the connected piped input
     * stream, but the thread is no longer alive, then an
     * <code>IOException</code> is thrown.
     * <p>
     * Implements the <code>write</code> method of <code>Writer</code>.
     *
     * @param      c   the <code>char</code> to be written.
     * @exception  IOException  if the pipe is
     *          <a href=PipedOutputStream.html#BROKEN> <code>broken</code></a>,
     *          {@link #connect(java.io.PipedReader) unconnected}, closed
     *          or an I/O error occurs.
     */
    public void write(int c)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        //调用PipedReader的receive(c)方法,将c放入PipedReader的char buffer[]中
        sink.receive(c);
    }
    /**
     * Receives a char of data. This method will block if no input is
     * available.
     */
    synchronized void receive(int c) throws IOException {
        if (!connected) {
            //判断两个I/O流连接状态
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
            //判断两个I/O流开启状态
            throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            //判断输入流线程是否存活
            throw new IOException("Read end dead");
        }

        //获取输出流线程
        writeSide = Thread.currentThread();
        while (in == out) {
            //判断char buffer[] 是否已满
            if ((readSide != null) && !readSide.isAlive()) {
                //判断输入流状态是否存活
                throw new IOException("Pipe broken");
            }
            /* full: kick any waiting readers */
            // 间隔1000毫秒唤醒写线程 -- start
            notifyAll();
            try {
                //阻塞1000毫秒
                wait(1000);
            } catch (InterruptedException ex) {
                throw new java.io.InterruptedIOException();
            }
            // 间隔1000毫秒唤醒写线程 -- end
        }
        if (in < 0) {
            //判断char buffer[] 为空
            in = 0;
            out = 0;
        }
        buffer[in++] = (char) c;
        if (in >= buffer.length) {
            in = 0;
        }
    }

  6. 

 

 

 

 

 

啦啦啦

TIJ -- 任务间使用管道进行输入/输出

标签:自动   queue   static   sleep   buffer   send   一段   nbsp   ack   

原文地址:http://www.cnblogs.com/ClassNotFoundException/p/7899835.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!