码迷,mamicode.com
首页 > 编程语言 > 详细

第一章线程管理

时间:2015-08-30 07:41:50      阅读:212      评论:0      收藏:0      [点我收藏+]

标签:java   线程   并发   

Java 7 并发编程实战手册目录


代码下载(https://github.com/Wang-Jun-Chao/java-concurrency)


第一章线程管理


1.1简介

  在计算机领域中,我们说的并发(Concurrency)是指一系列任务的同时运行。如果一 台电脑有多个处理器或者有一个多核处理器,这个同时性(Simultaneity)是真正意义的并 发;但是一台电脑只有一个单核处理器,这个同时性并不是真正的并发
  与并发相关的另一个概念是并行(Parallelism)。与并发有不同的定义一样,并行也有 不同的定义。并发是在单核处理器中使用多线程执行应用,并行是在多核处理器中使用多线程执行应用, 这里的多核处理器可以是一个多核处理器,也可以是同一台电脑上的多个处理器。发执行应用的线程是非顺序执行的,为并行是使用很多线程去简化问题,这些线程是按预定顺序执行的。

1.2线程的创建和运行

  Java提供了两种方式来创建线程:
  ?继承Thread类,并且覆盖run()方法。
  ?创建一个实现Runnable接口的类。使用带参数的Thread构造器来创建Thread对 象。这个参数就是实现Runnable接口的类的一个对象。

package com.concurrency.task;

// 创建一个名为Calculator的类,它实现了 Runnable接口
public class Calculator implements Runnable {
    // 声明一个名为number的私有(private) int属性
    private int number;

    // 编写这个类的一个构造器,用来为属性number设置值。
    public Calculator(int number) {
        this.number = number;
    }

    // run方法。这个方法用来执行我们创建的线程的指令,它将对指定的数字进行乘法表运算。
    @Override
    public void run() {
        for (int i = 1; i <= 10; i++) {
            System.out.printf("%s: %d * %d = %d\n", Thread.currentThread().getName(), number, i, number * i);
        }
    }
}
package com.concurrency.core;

import com.concurrency.task.Calculator;

public class Main {
    public static void main(String[] args) {
        // 创建一个执行10次的循环。在每次循环中创建一个Calculator 对象,
        // 一个Thread对象,这个Thread对象使用刚创建的Calculator对象作为构造器的参数,
        // 然后调用刚创建的Thread对象的start()方法。
        for (int i = 0; i <= 10; i++) {
            Calculator calculator = new Calculator(i);
            Thread thread = new Thread(calculator);
            thread.start();
        }
    }
}

技术分享

图1.2-1 部分运行结果

  当调用Thread对象的start()方法时,另一个执行线程将被创建。因而在我们的程序中, 每次调用start()方法时,都会创建一个执行线程。
  当一个程序的所有线程都运行完成时,更明确的说,当所有非守护(non-daemon)线程 都运行完成的时候,这个Java程序将宣告结束。如果初始线程(执行main()方法的线程) 结束了,其余的线程仍将继续执行直到它们运行结束。如果某一个线程调用了 SyStem.exit()指令来结束程序的执行,所有的线程都将结束。
  对一个实现了Runnable接U的类来说,创建Thread对象并不会创建一个新的执行线程;同样的,调用它的run()方法,也不会创建一个新的执行线程。只有调用它的start()方 法时,才会创建一个新的执行线程。
  还有另一种方法能够创建新的执行线程。编写一个类并继承 Thread类,在这个类里覆盖run()方法,然后创建这个类的对象,并且调用start()方法,也会创建一个执行线程。

1.3线程信息的获取和设置

  Thread类有一些保存信息的属性,这些属性可以用来标识线程,显示线程的状态或者控制线程的优先级。
  ID:保存了线程的唯一标示符。
  Name:保存了线程名称
  Priority:保存了线程对象的优先级。线程的优先级是从1到10,其中1是最低优先级; 10是最高优先级。并不推荐去改变线程的优先级,然而,在需要的时候,也可以这么做。
  Status:保存了线程的状态。在Java中,线程的状态有6种:new、runnable、blocked、 waiting、time waiting 或者 terminated。

package com.concurrency.task;

public class Calculator implements Runnable{
    private int number;

    public Calculator(int number) {
        this.number = number;
    }

    @Override
    public void run() {
        // 指定的数字进行乘法表运算。
        for (int i = 1; i <= 10; i++) {
            System.out.printf("%s: %d * %d = %d\n", Thread.currentThread().getName(), number, i, number * i);
        }
    }
}
package com.concurrency.core;

import com.concurrency.task.Calculator;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URLDecoder;

public class Main {
    public static void main(String[] args) {

        // 线程优先级信息
        System.out.printf("Minimum Priority: %s\n", Thread.MIN_PRIORITY);
        System.out.printf("Normal  Priority: %s\n", Thread.NORM_PRIORITY);
        System.out.printf("Maximum Priority: %s\n", Thread.MAX_PRIORITY);

        Thread threads[];
        Thread.State status[];

        // 运行10个线程,5个线程的使用最高优先级,5个线程使用最低优先级
        threads = new Thread[10];
        status = new Thread.State[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(new Calculator(i));
            if (i % 2 == 0) {
                threads[i].setPriority(Thread.MAX_PRIORITY);
            } else {
                threads[i].setPriority(Thread.MIN_PRIORITY);
            }
            threads[i].setName("Thread " + i);
        }

        // 等待线程完成,同时将线程状态信息写入到文件中
        PrintWriter pw = null;
        try {
            // 获取项目运行的根路径
            String configFile = Main.class.getClassLoader().getResource("").getPath();
            configFile = URLDecoder.decode(configFile, "utf-8");

            System.out.println(configFile);

            File logFile = new File(configFile + "/data/log.txt"); // 创建一个记录文件对象

            if(!logFile.getParentFile().exists()) {    // 如果目录不存在就创建目录
                logFile.getParentFile().mkdirs();
            }

            if (!logFile.exists()) { //如果文件不存在就创建一个文件
                logFile.createNewFile();
            }

            FileWriter file = new FileWriter(logFile);
            pw = new PrintWriter(file);

            for (int i = 0; i < 10; i++) {
                pw.println("Main : Status of Thread " + i + " : " + threads[i].getState());
                status[i] = threads[i].getState();
            }

            for (int i = 0; i < 10; i++) {
                threads[i].start();
            }

            boolean finish = false;
            while (!finish) {
                for (int i = 0; i < 10; i++) {
                    if (threads[i].getState() != status[i]) { // 如果线程状态发生了变化
                        writeThreadInfo(pw, threads[i], status[i]); // 将线程变化之前的状态写入文件
                        status[i] = threads[i].getState(); // 记录新的状态
                    }
                }

                finish = true;
                for (int i = 0; i < 10; i++) {
                    // 如果所有线程都终止了finish就为true
                    finish = finish && (threads[i].getState() == Thread.State.TERMINATED);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (pw != null) {
                pw.close();
            }
        }
    }

    /**
     * 将线程状态信息写入到一个文件中
     *
     * @param pw     写数据的流
     * @param thread 信息要被写入文件的线程
     * @param state  线程的前一个状态
     */
    private static void writeThreadInfo(PrintWriter pw, Thread thread, Thread.State state) {
        pw.printf("Main : Id %d ---- %s\n", thread.getId(), thread.getName());
        pw.printf("Main : Priority:  %d\n", thread.getPriority());
        pw.printf("Main : Old State: %s\n", state);
        pw.printf("Main : New State: %s\n", thread.getState());
        pw.printf("Main : ************************************\n");

    }
}

技术分享

图1.3-1 部分运行结果

  Thread类的属性存储了线程的所有信息。JVM使用线程的priority属性来决定某一刻 由哪个线程來使用CPU,并且根据线程的情景为它们设置实际状态。
  如果没有为线程指定一个名字,JVM将自动给它分配一个名字,格式是Thread-XX, 其中XX是一组数字。线程的ID和状态是不允许被修改的,线程类没有提供setId()和 setStatus()方法来修改它们。
  通过Thread对象访问属性信息。也可以通过实现 Runnable接口的对象来访问这些属性信息。如果一个线程是以Runnable对象为参数构建的, 那么也可以使用Thread类的静态方法currentThread()来访问这个线程对象。
  如果使用setPriority()方法设置的优先级不是从1到10这个范围内的值, 运行时就会抛出IllegalArgumentException异常。

1.4线程的中断

  如果一个Java程序有不止一个执行线程,当所有线程都运行结束的时候,这个Java 程序才能运行结束;更确切地说应该是所有的非守护线程运行结束时,或者其中一个线程调用了 System.exit()方法时,这个Java程序才运行结束。如果你想终止一个程序,或者程序的某个用户试图取消线程对象正在运行的任务,就需要结束这个线程。
  Java提供了中断机制,我们可以使用它来结束一个线程。这种机制要求线程检查它是 否被中断了,然后决定是不是响应这个中断请求。线程允许忽略中断请求并且继续执行。

package com.concurrency.task;

public class PrimeGenerator extends Thread {
    @Override
    public void run() {
        long number = 1L;

        while (true) {
            // 对每个数字,计算它是不是一个质数,如果是的话就打印到控制台。
            if (isPrime(number)) {
                System.out.printf("Number %d is Prime\n", number);
            }

            // 当被中断时,输出一条消息,并且退出方法
            if (isInterrupted()) {
                System.out.printf("The Prime Generator has been Interrupted\n");
                return;
            }
            number++;
        }
    }

    /**
     * 判断一个数是否是质数
     *
     * @param number 待判断的数
     * @return true是质数,false不是质数
     */
    private boolean isPrime(long number) {
        if (number <= 2) {
            return true;
        }

        for (long i = 2; i < number; i++) {
            if (number % i == 0) {
                return false;
            }
        }
        return true;
    }
}
package com.concurrency.core;

import com.concurrency.task.PrimeGenerator;

import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {

        Thread task = new PrimeGenerator();
        task.start(); // 启动质数生成线程
        try {
            TimeUnit.SECONDS.sleep(5); // 主线程休眠5s
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        task.interrupt(); // 质数生成线程中断
    }
}

技术分享

图1.4-1 部分运行结果

  Thread类有一个表明线程被中断与否的属性,它存放的是布尔值。线程的interrupt() 方法被调用时,这个属性就会被设置为true。
  isInterrupted()方法只是返回这个属性的值。还有一个方法可以检査线程是否已被中断,即Thread类的静态方法interrupted(),用 来检査当前执行的线程是否被中断。
  isInterrupted()和 interrupted()方法有一个很大的区别。isInterrupted()不能改变 interrupted 属性的值,但是后者能设置interrupted属性为false。因为interrupted()是一个静态方法,更推荐使用isInterruptedO()方法。
线程可以忽略中断,但并不是预期的行为。

1.5线程中断的控制

  如果线程实现了复杂的算法并且分布在几个方法中,或者线程里有递归调用的方法,需要有一个更好的机制来控制线程的中断。为了达到这个目的,Java提供了 InterruptedException异常。当检査到线程中断的时候,就抛出这个异常,然后在run()中捕获并处理这个异常。

package com.concurrency.task;

import java.io.File;

// 文件搜索类,给定一个文件目录,搜索其中指定的文件
public class FileSearch implements Runnable {
    /**
     * 搜索的初始路径
     */
    private String initPath;
    /**
     * 要搜索的文件名
     */
    private String fileName;


    /**
     * 构造函数
     *
     * @param initPath 搜索的初始路径
     * @param fileName 要搜索的文件名
     */
    public FileSearch(String initPath, String fileName) {
        this.initPath = initPath;
        this.fileName = fileName;
    }

    @Override
    public void run() {

    }

    /**
     * 清空资源,在本例中为空
     */
    private void cleanResources() {
        // 不需要做什么
    }

    /**
     * 处理目录
     *
     * @param file 待处理的目录
     * @throws InterruptedException 线程被中断时抛出异常
     */
    private void directoryProcess(File file) throws InterruptedException {
        File[] list = file.listFiles();  // 获取当目录中的所有文件
        if (list != null) { // 如果当前目录下有文件
            for (int i = 0; i < list.length; i++) {  // 遍布所有文件
                if (list[i].isDirectory()) { // 如果是一个目录
                    directoryProcess(list[i]); // 递归处理
                } else {
                    fileProcess(list[i]); // 如果是一个文件,调用文件处理方法
                }
            }
        }
    }

    /**
     * 文件处理方法
     *
     * @param file 待处理的文件名
     * @throws InterruptedException 线程被中断时抛出异常
     */
    private void fileProcess(File file) throws InterruptedException {
        if (file.getName().equals(this.fileName)) { // 当前文件名与要查找的文件同名,就输出信息
            System.out.printf("%s : %s\n", Thread.currentThread().getName(), file.getAbsolutePath());
        }

        if (Thread.interrupted()) {  // 程序被中断就抛出异常
            throw new InterruptedException();
        }
    }
}
package com.concurrency.core;

import com.concurrency.task.FileSearch;

import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        // 创建一个运行对象和一个运行它的线程
        FileSearch searcher = new FileSearch("C:/", "readme.txt");
        Thread thread = new Thread(searcher);

        thread.start(); // 启动线程

        try {
            TimeUnit.SECONDS.sleep(10); // 主线程休眠10s
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        thread.interrupt(); // 中断线程
    }
}

技术分享

图1.5-1 运行结果

1.6线程的休眠和恢复

  需要在某一个预期的时间中断线程的执行。例如,程序的一个线程每隔一分钟检査一次传感器状态,其余时间什么都不做。在这段空闲时间,线程不占用计 算机的任何资源。当它继续执行的CPU时钟来临时,JVM会选中它继续执行。可以通 过线程的sleep()方法来达到这个目标。  
  sleep()方法接受整型数值作为参数,以表明线程 挂起执行的毫秒数。当线程休眠的时间结束了,JVM会分给它CPU时钟,线程将继续 执行它的指令。
  sleep()方法的另一种使用方式是通过TimeUnit枚举类元素进行调用。这个方法也使用Thread类的sleep()方法来使当前线程休眠,但是它接收的参数单位是秒,最后会被转化成毫秒。

package com.concurrency.task;

import java.util.Date;
import java.util.concurrent.TimeUnit;

// 文件定时类,每隔一秒钟将实际的时间输出
public class FileClock implements Runnable {
    @Override
    public void run() {
        for (int i = 0; i < 10; i++) {
            System.out.printf("%s\n", new Date());
            try {
                // 休眠一秒
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                // 当线程被中断时,释放或者关闭线程正在使用的资源。
                System.out.printf("The FileClock has been interrupted");
                return; // 发生异常就跳出
            }
        }
    }
}
package com.concurrency.core;

import com.concurrency.task.FileClock;

import java.util.concurrent.TimeUnit;
public class Main {
    public static void main(String[] args) {
        // 创建一个文件时间运行对象,并且将其放入一个线程对象中
        FileClock clock = new FileClock();
        Thread thread = new Thread(clock);

        // 开始线程
        thread.start();
        try {
            // 等待五秒
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 中断线程
        thread.interrupt();
    }
}

技术分享

图1.6-1 部分运行结果

  当运行这个例子时,你可以看到程序每间隔一秒钟就会输出实际的时间,接下来是 FileClock线程已经被中断的信息。
  当调用sleep()方法之后,线程会释放CPU并且不再继续执行任务。在这段时间内,线程不占用CPU时钟,所以CPU可以执行其他的任务。
  如查线程休眠被中断,该方法就会立即抛出InterruptedException异常,而不需要等待到线程休眠时间结束。Java并发API还提供了另外一个方法来使线程对象释放CPU,即yield()方法,它将通 知JVM这个线程对象可以释放CPU 了。JVM并不保证遵循这个要求。通常来说,yield()方法只做调试使用。

1.7等待线程的终止

  在一些情形下,某个线程必须等待其它线程的终止。例如,程序在执行其他的任务时,必须先初始化一些必须的资源。可以使用线程来完成这些初始化任务,等待线程终止,再执行程序的其他任务。
  为了达到这个目的,使用Thread类的join()方法。当一个线程对象的join〇方法被 调用时,调用它的线程将被挂起,直到这个线程对象完成它的任务。

package com.concurrency.task;

import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * 数据源加载器,模拟数据加载,它会休眠10s
 */
public class DataSourcesLoader implements Runnable {
    @Override
    public void run() {
        // 输出一条消息
        System.out.printf("Beginning data sources loading: %s\n",new Date());

        // 休眠10s
        try {
            TimeUnit.SECONDS.sleep(4);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 输出一条消息
        System.out.printf("Data sources loading has finished: %s\n",new Date());
    }
}
package com.concurrency.task;

import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * 网络连接加载器,模拟网络连接,它会休眠6s
 */
public class NetworkConnectionsLoader implements Runnable {
    @Override
    public void run() {
        // 输出一条消息
        System.out.printf("Begining network connections loading: %s\n",new Date());

        // 休眠6s
        try {
            TimeUnit.SECONDS.sleep(6);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 输出一条消息
        System.out.printf("Network connections loading has finished: %s\n",new Date());
    }
}
package com.concurrency.core;

import com.concurrency.task.DataSourcesLoader;
import com.concurrency.task.NetworkConnectionsLoader;

import java.util.Date;

public class Main {
    public static void main(String[] args) {
        // 创建并启动数据源加载器
        DataSourcesLoader dsLoader = new DataSourcesLoader();
        Thread thread1 = new Thread(dsLoader, "DataSourceThread");
        thread1.start();

        // 创建并且启动网络连接加载器
        NetworkConnectionsLoader ncLoader = new NetworkConnectionsLoader();
        Thread thread2 = new Thread(ncLoader, "NetworkConnectionLoader");
        thread2.start();

        // 待待两个线程的任务完成
        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 两个任务都完成后输出一条消息
        System.out.printf("Main: Configuration has been loaded: %s\n", new Date());
    }
}

技术分享

图1.7-1 部分运行结果

  运行这个程序时,你会看到两个线程对象是如何运行的。DataSourcesLoader线程运行结束,NetworkConnectionsLoader线程也运行结束的时候,主线程对象才会继续运行并且打 印出最终的信息。
  Java提供了另外两种形式的join()方法
  join (long milliseconds)
  join (long milliseconds, long nanos)
  当一个线程调用其他某个线程的join()方法时,如果使用的是第一种join()方式,那么它不必等到被调用线程运行终止,如果参数指定的毫秒时钟已经到达,它将继续运行。例如,thread1中有这样的代码thread2.join(1000),thread1将挂起运行,直到满足下面两个条 件之一:
  ?时钟已经过去1000毫秒。
  ?thread2已经运行完成。
  当两个条件中的任何一条成立时,join()方法将返回。
  第二种join()方法跟第一种相似,只是需要接受毫秒和纳秒两个参数。

1.8守护线程的创建和运行

  Java里有一种特殊的线程叫做守护(Daemon)线程。这种线程的优先级很低,通常来说,当同一个应用程序里没有其他的线程运行的时候,守护线程才运行。当守护线程是程序中唯一运行的线程时,守护线程执行结束后,JVM也就结束了这个程序。
  因为这种特性,守护线程通常被用来做为同一程序中普通线程(也称为用户线程)的 服务提供者。它们通常是无限循环的,以等待服务请求或者执行线程的任务。它们不能做重要的工作,因为我们不可能知道守护线程什么时候能够获取CPU时钟,并且,在没有其他线程运行的时候,守护线程随时可能结束。一个典型的守护线程是Java的垃圾回收器 (Garbage Collector)。

package com.concurrency.event;

import java.util.Date;

/**
 * 事件类,存储事件信息
 */
public class Event {
    /**
     * 事件日期
     */
    private Date date;
    /**
     * 事件信息
     */
    private String event;

    /**
     * 获取事件日期
     *
     * @return 事件日期
     */
    public Date getDate() {
        return date;
    }

    /**
     * 设置事件日期
     *
     * @param date 事件日期
     */
    public void setDate(Date date) {
        this.date = date;
    }

    /**
     * 获取事件信息
     *
     * @return 事件信息
     */
    public String getEvent() {
        return event;
    }

    /**
     * 设置事件信息
     *
     * @param event 事件信息
     */
    public void setEvent(String event) {
        this.event = event;
    }
}
package com.concurrency.task;

import com.concurrency.event.Event;

import java.util.Date;
import java.util.Deque;
import java.util.concurrent.TimeUnit;

/**
 * 写事件的类,每一秒钟产生一个事件对象
 */
public class WriterTask implements Runnable {
    /**
     * 用于存储事件对象的队列
     */
    Deque<Event> deque;

    /**
     * 构造函数
     *
     * @param deque 存储事件对象的队列
     */
    public WriterTask(Deque<Event> deque) {
        this.deque = deque;
    }

    @Override
    public void run() {
        // 产生100个事件对象
        for (int i = 1; i < 100; i++) {
            // 创建和初始化事件对象
            Event event = new Event();
            event.setDate(new Date());
            event.setEvent(String.format("The thread %s has generated an event", Thread.currentThread().getId()));

            // 将事件添加对队列头部
            deque.addFirst(event);
            try {
                // 休眠一秒种
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
package com.concurrency.task;

import com.concurrency.event.Event;

import java.util.Date;
import java.util.Deque;

/**
 * 事件清除类,每隔10秒从队尾取出一个事件,并且删除这个事件
 */
public class CleanerTask extends Thread {

    /**
     * 用于存储事件对象的队列
     */
    Deque<Event> deque;

    /**
     * 构造函数
     *
     * @param deque 存储事件对象的队列
     */
    public CleanerTask(Deque<Event> deque) {
        this.deque = deque;
        setDaemon(true); // 表明当前对象是一个精灵线程
    }


    @Override
    public void run() {
        while (true) {
            Date date = new Date();
            clean(date);
        }
    }

    /**
     * 清除方法,生存时间长于10秒的事件进行清除
     * @param date 当前时间
     */
    private void clean(Date date) {
        long difference;
        boolean delete;

        if (this.deque.size() == 0) {
            return;
        }

        delete = false;
        do {
            Event e = this.deque.getLast();
            difference = date.getTime() - e.getDate().getTime(); // 计算最早的事件距离现在的时间
            if (difference > 10000) {  // 大于10秒就输出信息,并且删除最先发生的事件
                System.out.printf("Cleaner: %s\n", e.getEvent());
                deque.removeLast();
                delete = true;
            }
        } while (difference > 10000);

        if (delete) { // 有删除就输出删除后队列的大小
            System.out.printf("Cleaner: Size of the queue: %d\n", deque.size());
        }
    }
}
package com.concurrency.core;

import com.concurrency.event.Event;
import com.concurrency.task.CleanerTask;
import com.concurrency.task.WriterTask;

import java.util.ArrayDeque;
import java.util.Deque;

public class Main {
    public static void main(String[] args) {
        // 创建一个用于存放事件对象的队列
        Deque<Event> deque = new ArrayDeque<Event>();

        // 创建一个写任务的对象,并且创建三个线程去调用这个对象
        WriterTask writer = new WriterTask(deque);
        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(writer);
            thread.start();
        }

        // 创建一个事件清除任务,并且启动这个任务
        CleanerTask cleaner = new CleanerTask(deque);
        cleaner.start();
    }
}

技术分享

图1.8-1 部分运行结果

  对程序的运行输出进行分析之后发现,队列中的对象会不断增长直到30个, 然后到程序结束,队列的长度维持在27~30之间。
  这个程序有3个WriterTask线程,每个线程向列队写入一个事件,然后休眠1秒钟。 在第一个10秒钟内,队列中有30个事件,直到3个WriterTask都休眠后,CleanerTask才 开始执行,但是它没有删除任何事件。因为所有的事件都小于10秒钟。在接下来的运行中, CleanerTasl(每秒删除3个对象,同时WriterTask会写入3个对象,所以队列的长度一直介 于27~30之间。
  setDaemom()方法只能在start()方法被调用之前设置,一旦线程开始运行,将不能再修改守护状态。

1.9线程中不可控异常的处理

  在Java中有两种异常。
  ?非运行时异常(Checked Exception):这种异常必须在方法声明的throws语句指定,或者在方法体内捕获。例如:IOException和ClassNotFoundException。
  ?运行时异常(Unchecked Exception):这种异常不必在方法声明中指定,也不需要在方法体中捕获。例如:NumberFormatException。
  因为run()方法不支持throws语句,所以当线程对象的run()方法抛出非运行异常时, 我们必须捕获并且处理它们。当运行时异常从run()方法中抛出时,默认行为是在控制台输出堆栈记录并且退出程序。
  Java提供给我们一种在线程对象里捕获和处理运行时异常的一种机制。

package com.concurrency.handler;

/**
 * 异常处理类,处理线程中抛出的未捕获的异常
 */
public class ExceptionHandler implements Thread.UncaughtExceptionHandler {
    /**
     * 处理线程中抛出的未捕获的异常
     * @param t 招聘异常的线程
     * @param e 抛出的异常
     */
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        System.out.printf("An exception has been captured\n");
        System.out.printf("Thread: %s\n", t.getId());
        System.out.printf("Exception: %s: %s\n", e.getClass().getName(), e.getMessage());
        System.out.printf("Stack Trace: \n");
        e.printStackTrace(System.out);
        System.out.printf("Thread status: %s\n", t.getState());
    }
}
package com.concurrency.task;

/**
 * 任务类,专门抛出异常
 */
public class Task implements Runnable {
    @Override
    public void run() {
        // 下面的语句会招聘异常
        int number = Integer.parseInt("TTT");
    }
}
package com.concurrency.core;

import com.concurrency.handler.ExceptionHandler;
import com.concurrency.task.Task;

public class Main {
    public static void main(String[] args) {
        Task task = new Task(); // 创建一个任务
        Thread thread = new Thread(task); // 创建一个线程
        thread.setUncaughtExceptionHandler(new ExceptionHandler()); // 设置线程的异常处理器
        thread.start();

        try {
            thread.join(); // 等待线程完成
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.printf("Thread has finished\n");
    }
}

技术分享

图1.9-1 部分运行结果

  Thread类还有另一个方法可以处理未捕获到的异常,即静态方法setDefaultUncaughtExceptionHandler()。这个方法在应用程序中为所有的线程对象创建了一个异常处理器.
  当线程抛出一个未捕获到的异常时,JVM将为异常寻找以下三种可能的处理器。
  首先,它査找线程对象的未捕获异常处理器。如果找不到,JVM继续査找线程对象所在的线程组(ThreadGroup)的未捕获异常处理器,如果还是找不到,JVM将继续查找默认的未捕获异常处理器.
  如果没有一个处理器存在,JVM则将堆栈异常记录打印到控制台,并退出程序。

1.10线程局部变量的使用

  共享数据是并发程序最核心的问题之一,对于继承了Thread类或者实现了Runnable接口的对象来说尤其重要。
  如果创建的对象是实现了Runnable接口的类的实例,用它作为传入参数创建多个线程对象并启动这些线程,那么所有的线程将共享相同的属性。也就是说,如果你在一个线程中改变了一个属性,所有线程都会被这个改变影响.
  在某种情况下,这个对象的属性不需要被所有线程共享。Java并发API提供了一个干净的机制,即线程局部变量(Thread-Local Variable),其具有很好的性能。
  下面是线程共享的变量的例子。

package com.concurrency.task;

import java.sql.Time;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * 线程不安全的任务,当这个任务在多个线程中时,其中的变量会被多个线程其享
 */
public class UnsafeTask implements Runnable {
    /**
     * 日期对象,被所有线程共享
     */
    private Date startDate;

    @Override
    public void run() {
        this.startDate = new Date();
        System.out.printf("Starting Thread: %s : %s\n", Thread.currentThread().getId(), startDate);
        try {
            TimeUnit.SECONDS.sleep((int) Math.rint(Math.random() * 10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.printf("Thread Finished: %s : %s\n", Thread.currentThread().getId(), startDate);
    }
}
package com.concurrency.core;

import com.concurrency.task.UnsafeTask;

import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        // 创建线程不安全的任务
        UnsafeTask task = new UnsafeTask();

        // 将任务入进三个不同的线程中
        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(task);
            thread.start();
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

技术分享

图1.10-1 部分运行结果

  下面是使用线程局部变量。

package com.concurrency.task;

import java.util.Date;
import java.util.concurrent.TimeUnit;

public class SafeTask implements Runnable {
    /**
     * 线程局部变量,其中的内容不能共享,线程被初始化时会创建其包含的变量
     */
    private static ThreadLocal<Date> startDate = new ThreadLocal<Date>() {
        @Override
        protected Date initialValue() {
            return new Date();
        }
    };

    @Override
    public void run() {
        System.out.printf("Starting Thread: %s : %s\n", Thread.currentThread().getId(), startDate.get());
        try {
            TimeUnit.SECONDS.sleep((int) Math.rint(Math.random() * 10));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.printf("Thread Finished: %s : %s\n", Thread.currentThread().getId(), startDate.get());
    }
}
package com.concurrency.core;

import com.concurrency.task.SafeTask;

import java.util.concurrent.TimeUnit;

public class SafeMain {
    public static void main(String[] args) {
        // 创建一个任务
        SafeTask task = new SafeTask();

        // 将任务放入三个不同的线程中运行
        for (int i = 0; i < 3; i++) {
            Thread thread = new Thread(task);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            thread.start();
        }
    }
}

技术分享

图1.10-2 部分运行结果
  安全线程类的执行结果。现在,这3个线程对象都有它们自己的startDate属性值。
  线程局部变量分别为每个线程存储了各自的属性值.并提供给每个线程使用。你可以使用get()方法读取这个值,并用set()方法设置这个值。如果线程是第一次访问线程局部变量,线程局部变量可能还没有为它存储值,这个时候initialValue()方法就会被调用,并且返回当前的时间值。
  线程局部变量也提供了remove()方法,用来为访问这个变量的线程删除己经存储的值。 Java并发API包含了 InheritableThreadLocal类,如果一个线程是从其他某个线程中创建的, 这个类将提供继承的值。如果一个线程A在线程局部变量B有值,当它创建其他某个线程 B时,线程B的线程局部变量将跟线程A是一样的。可以覆盖childValue()方法,这个方法用来初始化子线程在线程局部变量中的值,它使用父线程在线程局部变量中的值作为传入参数。

1.11线程的分组

  Java并发API提供了一个功能,它能够把线程分组。这允许我们把一个组的线程当成一个单一的单元,对组内线程对象进行访问并操作它们。例如,对于一些执行同样任务的线程,你想控制它们,不管多少线程在运行,只需要一个单一的调用,所有这些线 程的运行都会被中断.
  Java提供ThreadGroup类表示一组线程。线程组可以包含线程对象,也可以包含其他的线程组对象,它是一个树形结构。

package com.concurrency.task;

/**
 * 结果类用于存储搜索结果
 */
public class Result {
    /**
     * 完成任务的线程名
     */
    private String name;

    /**
     * 获取完成任务的线程名
     * @return  完成任务的线程名
     */
    public String getName() {
        return name;
    }

    /**
     * 设置完成任务的线程名
     * @param name 完成任务的线程名
     */
    public void setName(String name) {
        this.name = name;
    }
}
package com.concurrency.task;

import java.util.Date;
import java.util.Random;
import java.util.concurrent.TimeUnit;

public class SearchTask implements Runnable {

    /**
     * 如果线程完成了任务,并且没有中断,就存储线程的名字。
     */
    private Result result;

    /**
     * 构造函数
     *
     * @param result 结果对象
     */
    public SearchTask(Result result) {
        this.result = result;
    }

    @Override
    public void run() {
        String name = Thread.currentThread().getName();
        System.out.printf("Thread %s: Start\n", name);
        try {
            doTask();
            result.setName(name);
        } catch (InterruptedException e) {
            System.out.printf("Thread %s: Interrupted\n", name);
            return;
        }
        System.out.printf("Thread %s: End\n", name);
    }

    /**
     * 模拟搜索操作
     *
     * @throws InterruptedException 中断异常
     */
    private void doTask() throws InterruptedException {
        Random random = new Random((new Date()).getTime());
        int value = (int) (random.nextDouble() * 100);
        System.out.printf("Thread %s: %d\n", Thread.currentThread().getName(), value);
        TimeUnit.SECONDS.sleep(value);
    }
}
package com.concurrency.core;

import com.concurrency.task.Result;
import com.concurrency.task.SearchTask;

import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {

        // 创建一个线程组
        ThreadGroup threadGroup = new ThreadGroup("Searcher");
        // 创建一个结果对象
        Result result = new Result();

        // 创建一个搜索任务,并且创建5个线程去运行这个任务
        SearchTask searchTask = new SearchTask(result);
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(threadGroup, searchTask);
            thread.start();
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }

        // 输出线程组的信息
        System.out.printf("Number of Threads: %d\n", threadGroup.activeCount());
        System.out.printf("Information about the Thread Group\n");
        threadGroup.list(); // 将有关此线程组的信息打印到标准输出。

        Thread[] threads = new Thread[threadGroup.activeCount()]; // 返回此线程组中活动线程的估计数。
        threadGroup.enumerate(threads); // 把此线程组及其子组中的所有活动线程复制到指定数组中。
        for (int i = 0; i < threadGroup.activeCount(); i++) {
            System.out.printf("Thread %s: %s\n", threads[i].getName(), threads[i].getState());
        }

        // 等待线程结束
        waitFinish(threadGroup);

        // 中断线程组中的所有线程
        threadGroup.interrupt();

    }

    /**
     * 等待线程组中的一个线程结束
     *
     * @param threadGroup 线程组
     */
    private static void waitFinish(ThreadGroup threadGroup) {
        while (threadGroup.activeCount() > 9) { // 如果线程组中的活动线程数大于9个,当前调用线程就休眠1秒,直到线程数小于9个
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

技术分享

图1.2-1 部分运行结果

1.12线程组中不可控异常的处理

  建立一个方法来捕获线程组中的任何线程对象抛出的非捕获异常。

package com.concurrency.task;

import java.util.Random;

public class Task implements Runnable {
    @Override
    public void run() {
        int result;
        // 创建一个随机数生成器
        Random random = new Random(Thread.currentThread().getId());
        while (true) {
            // 生成一个[0, 1000)内有随机整数,并且有1000除以这个数,求得商
            result = 1000 / ((int) (random.nextDouble() * 1000));
            System.out.printf("%s : %d\n", Thread.currentThread().getId(), result);
            // 检测当前线程是否被中断
            if (Thread.currentThread().isInterrupted()) {
                System.out.printf("%d : Interrupted\n", Thread.currentThread().getId());
                return;
            }
        }
    }
}
package com.concurrency.group;

public class MyThreadGroup extends ThreadGroup {
    /**
     * 构造函数
     *
     * @param name 线程组名称
     */
    public MyThreadGroup(String name) {
        super(name);
    }

    /**
     * 重写未捕获的异常方法
     *
     * @param t 抛出异常的信息
     * @param e 抛出的异常
     */
    @Override
    public void uncaughtException(Thread t, Throwable e) {
        // 打印线程的名称
        System.out.printf("The thread %s has thrown an Exception\n", t.getId());
        // 输出异常栈信息
        e.printStackTrace(System.out);
        // 中断线程组中其余的线程
        System.out.printf("Terminating the rest of the Threads\n");
        interrupt();
    }
}
package com.concurrency.core;

import com.concurrency.group.MyThreadGroup;
import com.concurrency.task.Task;

public class Main {

    public static void main(String[] args) {
        // 创建一个自定义的线程组
        MyThreadGroup threadGroup = new MyThreadGroup("MyThreadGroup");
        // 创建一个任务
        Task task = new Task();
        // 创建两个线程,将其放入同一个线程组中,并且执行同一个任务
        for (int i = 0; i < 2; i++) {
            Thread t = new Thread(threadGroup, task);
            t.start();
        }
    }
}

技术分享

图1.12-1 部分运行结果

  运行时.你会看到当一个线程对象抛出了异常,其余的线程对象都被中断. 当线程抛出非捕获异常时,JVM将为这个异常寻找3种可能的处理器.
  首先,寻找抛出这个异常的线程的非捕获异常处理器,如果这个处理器不存在,JVM继续査找这个线程所在的线程组的非捕获异常处理器。如果也不存在JVM将寻找默认的非捕获异常处理器,如果这些处理器都不存在,JVM将把堆栈中异常信息打印到控制台,并且退出这个程序。

1.13使用工厂类创建线程

  工厂模式是面向对象编程中最常使用的模式之一.它是一个创建者模式,使用一个类为其他的一个或者多个类创建对象。当我们要为这些类创建对象时,不需再使用new构造 器,而使用工厂类。
  使用工厂类,可以将对象的创建集中化,这样做有以下的好处:
  ?更容易修改类,或者改变创建对象的方式i
  ?更容易为有限资源限制创建对象的数目,例如,可以限制一个类型的对象不多于1个。
  ?更容易为创建的对象生成统计数据。
  Java提供了 ThreadFactory接口,这个接口实现了线程对象工厂》Java并发API的高级工具类也使用了线程工厂创建线程。

package com.concurrency.factory;

import com.sun.javafx.beans.annotations.NonNull;

import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadFactory;


public class MyThreadFactory implements ThreadFactory {

    private int counter; // 线程计数器,计数已经产生了多少个线程
    private String name; // 线程的基准名称
    private List<String> stats;  // 线程统计信息集合

    /**
     * 构造函数
     *
     * @param name 线程对象的基准名称
     */
    public MyThreadFactory(String name) {
        this.name = name;
        this.counter = 0;
        this.stats = new ArrayList<String>();
    }

    /**
     * 使用Runnable对象创建一个线程
     *
     * @param r Runnable对象
     * @return 线程对象
     */
    @Override
    public Thread newThread(Runnable r) {
        // 创建一个新的线程对象
        Thread t = new Thread(r, this.name + "-Thread_" + this.counter);
        this.counter++;
        // Actualize the statistics of the factory
        this.stats.add(String.format("Created thread %d with name %s on %s\n", t.getId(), t.getName(), new Date()));
        return t;
    }

    /**
     * 获取线程工厂的统计信息
     *
     * @return 线程工厂的统计信息
     */
    public String getStats() {
        StringBuffer buffer = new StringBuffer();
        Iterator<String> it = stats.iterator();

        while (it.hasNext()) {
            buffer.append(it.next());
        }

        return buffer.toString();
    }
}
package com.concurrency.task;

import java.util.concurrent.TimeUnit;

public class Task implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
package com.concurrency.core;

import com.concurrency.factory.MyThreadFactory;
import com.concurrency.task.Task;

public class Main {
    public static void main(String[] args) {
        // 创建一个线程工厂
        MyThreadFactory factory = new MyThreadFactory("MyThreadFactory");
        // 创建一个任务
        Task task = new Task();
        Thread thread;

        // 创建并且启动10个线程对象
        System.out.printf("Starting the Threads\n");
        for (int i = 0; i < 10; i++) {
            thread = factory.newThread(task);
            thread.start();
        }
        // 打印线程工厂的统计信息
        System.out.printf("Factory stats:\n");
        System.out.printf("%s\n", factory.getStats());
    }
}

技术分享

图1.13-1 部分运行结果

  ThreadFactory接口只有一个方法,即newThread,它以Runnable接口对象作为传入参数并且返回一个线程对象.当实现ThreadFactory接口时,必须实现覆盖这个方法。大多数基本的线程工厂类只有一行,即:
return new Thread(r);
可以通过增加一些变化来强化实现方法覆盖。
  ?创建一个个性化线程,如本例使用一个特殊的格式作为线程名,或者通过继承 Thread类来创建自己的线程类:
  ?保存新创建的线程的统计数据,如本节的例子那样;
  ?限制刨建的线程的数量;
  ?对生成的线程进行验证;
  ?更多你可以想到的.
  使用工厂设计模式是一个很好的编程实践,但是,如果是通过实现ThreadFactory接口来创建线程,你必须检查代码,以保证所有的线程都是使用这个工厂创建的。

欢迎转载,转载请注明出处

版权声明:本文为博主原创文章,未经博主允许不得转载。

第一章线程管理

标签:java   线程   并发   

原文地址:http://blog.csdn.net/derrantcm/article/details/48101447

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!