public static void main(String[] args) { Thread thread = new Thread(){ @Override public void run() { while(true){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("1:" + Thread.currentThread().getName()); } } }; thread.start(); Thread thread2 = new Thread(new Runnable(){ @Override public void run() { while(true){ try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("1:" + Thread.currentThread().getName()); } } }); thread2.start(); }
一种工具,线程用其安排以后在后台线程中执行任务,可安排任务执行一次,或者定期重复执行,与每个 Timer 对象相对应的是单个后台线程,用于顺序地执行所有计时器任务.
public static void main(String[] args) throws InterruptedException {//下面两个会同时执行,Timer也是一个线程 new Timer().schedule(new TimerTask() { @Override public void run() { System.out.println("bombing!"); } }, 10000,1000); //10s之后开始执行任务,之后每一秒执行一次 new Thread(){ @Override public void run() { while(true){ try { Thread.sleep(100); } catch (InterruptedException e) {} System.out.println("wangwei"); } } }.start();
class Outputer{ //共享资源封装成类 public void output(String name){ int len = name.length(); synchronized (Outputer.class) { for(int i=0;i<len;i++){ System.out.print(name.charAt(i)); } System.out.println(); } } public synchronized void output2(String name){ int len = name.length(); for(int i=0;i<len;i++){ System.out.print(name.charAt(i)); } System.out.println(); } } public class TraditionalThreadSynchronized { Outputer outputer = new Outputer(); public static void main(String[] args) { new TraditionalThreadSynchronized().init(); } private void init(){ new Thread(new Runnable(){ public void run() { while(true) outputer.output("zhangxiaoxiang"); } }).start(); new Thread(new Runnable(){ public void run() { while(true) outputer.output("lihuoming"); } }).start(); } }
public class TraditionalThreadCommunication { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable() { @Override public void run() { for(int i=1;i<=50;i++){ business.sub(i); } } }).start(); for(int i=1;i<=50;i++){ business.main(i); } } } class Business { private boolean bShouldSub = true; public synchronized void sub(int i){ while(!bShouldSub){ try { this.wait(); } catch (InterruptedException e) {} } for(int j=1;j<=10;j++){ System.out.println("sub thread sequence of " + j + ",loop of " + i); } bShouldSub = false; this.notify(); } public synchronized void main(int i){ while(bShouldSub){ try { this.wait(); } catch (InterruptedException e) {} } for(int j=1;j<=100;j++){ System.out.println("main thread sequence of " + j + ",loop of " + i); } bShouldSub = true; this.notify(); } }
如果每个线程执行的代码相同,可以使用同一个Runnable对象,这个Runnable对象中有那个共享数据,例如,买票系统就可以这么做. 如果每个线程执行的代码不同,这时候需要用不同的Runnable对象,如下方式来实现这些Runnable对象之间的数据共享: 将共享数据封装在另外一个对象中,然后将这个对象逐一传递给各个Runnable对象.每个线程对共享数据的操作方法也分配到那个对象身上去完成,这样容易实现针对该数据进行的各个操作的互斥和通信. 总之,要同步互斥的几段代码最好是分别放在几个独立的方法中,这些方法再放在同一个类中,这样比较容易实现它们之间的同步互斥和通信. 极端且简单的方式,即在任意一个类中定义一个static的变量,这将被所有线程共享.
public class MultiThreadShareData { private static ShareData data1 = new ShareData(); //被所有线程共享 public static void main(String[] args) { ShareData data = new ShareData(); new Thread(new MyRunnable1(data)).start(); new Thread(new MyRunnable2(data)).start(); } } class MyRunnable1 implements Runnable{ private ShareData data; public MyRunnable1(ShareData data){ this.data = data; } public void run() { data.decrement(); } } class MyRunnable2 implements Runnable{ private ShareData data; public MyRunnable2(ShareData data){ this.data = data; } public void run() { data.increment(); } }
//共享数据类 class ShareData{ /*implements Runnable 这种代码不好,没有将共享数据和线程分开,不利于维护 private int count = 100; @Override public void run() { // TODO Auto-generated method stub while(true){ count--; } }*/ private int j = 0; public synchronized void increment(){ j++; } public synchronized void decrement(){ j--; } }
当然ThreadLocal并不能替代同步机制,两者面向的问题领域不同.同步机制是为了同步多个线程对相同资源的并发访问,是为了多个线程之间进行通信 的有效方式;而ThreadLocal是隔离多个线程的数据共享,从根本上就不在多个线程之间共享资源(变量),这样当然不需要对多个线程进行同步了.所以,如果你需要进行多个线程之间进行通信,则使用同步机制;如果需要隔离多个线程之间的共享冲突,可以使用ThreadLocal,这将极大地简化你的程序,使程序更加易读、简洁.
public class ThreadLocalTest { private static ThreadLocal<Integer> x=new ThreadLocal<Integer>(); private static ThreadLocal<MyThreadScopeData> myThreadScopeData=
new ThreadLocal<MyThreadScopeData>(); public static void main(String[] args) { for(int i=0;i<2;i++){ new Thread(new Runnable(){ @Override public void run() { int data = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " has put data :" + data); x.set(data); MyThreadScopeData myData = new MyThreadScopeData(); myData.setName("name" + data); myData.setAge(data); myThreadScopeData.set(myData); int data1 = x.get(); System.out.println("A from " + getName()+" get data :" + data1); MyThreadScopeData myData1 = myThreadScopeData.get();; System.out.println("A from " + getName()+" getMyData: " + myData1.getName()); } }).start(); } } } class MyThreadScopeData{ private String name; private int age; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } }
public class ThreadScopeShareData { private static int data = 0; private static Map<Thread, Integer> threadData = new HashMap<Thread, Integer>(); public static void main(String[] args) { for(int i=0;i<2;i++){ new Thread(new Runnable(){ @Override public void run() { data = new Random().nextInt(); System.out.println(Thread.currentThread().getName() + " has put data :" + data); threadData.put(Thread.currentThread(), data); new A().get(); new B().get(); } }).start(); } } static class A{ public void get(){ int data = threadData.get(Thread.currentThread()); System.out.println("A from " + Thread.currentThread().getName() + " get data :" + data); } } static class B{ public void get(){ int data = threadData.get(Thread.currentThread()); System.out.println("B from " + Thread.currentThread().getName() + " get data :" + data); } } }
private static final ThreadLocal threadSession = new ThreadLocal(); public static Session getSession() throws InfrastructureException { Session s = (Session) threadSession.get(); try { if (s == null) { s = getSessionFactory().openSession(); threadSession.set(s); } } catch (HibernateException ex) { throw new InfrastructureException(ex); } return s; }
执行已提交的 Runnable对象的任务.此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法.通常使用Executor而不是显式地创建线程.例如下所示,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start().
import java.util.concurrent.Executor; class TestRunnable implements Runnable{ @Override public void run() { for(int i=0;i<10;i++) System.out.println("ww"); } } class MyExecutor implements Executor{ @Override public void execute(Runnable command) { new Thread(command).start(); } } public class TestExecutor extends Thread{ public static void main(String[] args) { TestRunnable t = new TestRunnable(); Executor executor = new MyExecutor(); executor.execute(t); } }
大多数服务器应用程序(如 Web 服务器、POP 服务器、数据库服务器或文件服务器)代表远程客户机处理请求,这些客户机通常使用 socket 连接到服务器.对于每个请求,通常要进行少量处理(获得该文件的代码块,并将其发送回 socket),但是可能会有大量(且不受限制)的客户机请求服务.
用于构建服务器应用程序的简单化模型会为每个请求创建新的线程.下列代码段实现简单的 Web 服务器,它接受端口 80 的 socket 连接,并创建新的线程来处理请求.不幸的是,该代码不是实现 Web 服务器的好方法,因为在重负载条件下它将失败,停止整台服务器.
class UnreliableWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable r = new Runnable() { public void run() { handleRequest(connection); } }; new Thread(r).start(); } } }
无论如何,这样使用资源可能会损害性能.如果创建过多线程,其中每个线程都将占用一些 CPU 时间,结果将使用许多内存来支持大量线程,每个线程都运行得很慢。这样就无法很好地使用计算资源,管理一大组小任务的标准机制是组合工作队列和线程池.工作队列就是要处理的任务的队列,前面描述的 Queue 类完全适合.线程池是线程的集合,每个线程都提取公用工作队列.当一个工作线程完成任务处理后,它会返回队列,查看是否有其他任务需要处理.如果有,它会转移到下一个任务,并开始处理,如下是一个使用线程池的简单网络服务:
class NetworkService implements Runnable { private final ServerSocket serverSocket; private final ExecutorService pool; public NetworkService(int port, int poolSize) throws IOException { serverSocket = new ServerSocket(port); pool = Executors.newFixedThreadPool(poolSize); } public void run() { // run the service try { for (;;) { pool.execute(new Handler(serverSocket.accept())); } } catch (IOException ex) { pool.shutdown(); } } } class Handler implements Runnable { private final Socket socket; Handler(Socket socket) { this.socket = socket; } public void run() { // read and service request on socket } }
public class ThreadPoolTest { public static void main(String[] args) { //可重用固定线程数的线程池 ExecutorService threadPool = Executors.newFixedThreadPool(3); //可根据需要创建新线程的缓存线程池 //ExecutorService threadPool = Executors.newCachedThreadPool(); //单个线程的线程池 //ExecutorService threadPool = Executors.newSingleThreadExecutor(); //一个可以在给定延迟后运行命令或者定期地执行的线程池 ScheduledExecutorService ScheduledThreadPool = Executors.newScheduledThreadPool(3); for(int i=1;i<=10;i++){ final int task = i; threadPool.execute(new Runnable(){ @Override public void run() { for(int j=1;j<=10;j++){ System.out.println(Thread.currentThread().getName() + " is looping of " + j
+ " for task of " + task); } } }); } ScheduledThreadPool.scheduleAtFixedRate(new Runnable(){ public void run(){ System.out.println("bombing!"); } }, 10, 1, TimeUnit.SECONDS); } }
public class LockTest { public static void main(String[] args) { new LockTest().init(); } private void init(){ final Outputer outputer = new Outputer(); new Thread(new Runnable(){ @Override public void run() { while(true){ outputer.output("zhangxiaoxiang"); } } }).start(); new Thread(new Runnable(){ @Override public void run() { while(true){ outputer.output("lihuoming"); } } }).start(); } } class Outputer{ Lock lock = new ReentrantLock(); public void output(String name){ int len = name.length(); lock.lock(); try{ for(int i=0;i<len;i++){ System.out.print(name.charAt(i)); } System.out.println(); }finally{ lock.unlock(); } } }
与互斥锁相比,读-写锁允许对共享数据进行更高级别的并发访问.虽然一次只有一个线程(writer 线程)可以修改共享数据,但在许多情况下,任何数量的线程可以同时读取共享数据(reader 线程),读-写锁利用了这一点.使用读-写锁所允许的并发性增强将带来更大的性能提高.
public class ReadWriteLockTest { public static void main(String[] args) { final Queue q = new Queue(); for(int i=0;i<3;i++){ new Thread(){ public void run(){ while(true){ q.get(); } } }.start(); new Thread(){ public void run(){ while(true){ q.put(new Random().nextInt(10000)); } } }.start(); } } } class Queue{ private Object data = null;//共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据. ReadWriteLock rwl = new ReentrantReadWriteLock(); public void get(){ rwl.readLock().lock(); try { System.out.println(Thread.currentThread().getName() + "be ready to read data!"); Thread.sleep((long)(Math.random()*100)); System.out.println(Thread.currentThread().getName() + "have read data :" + data); } catch (InterruptedException e) { e.printStackTrace(); }finally{ rwl.readLock().unlock(); } } public void put(Object data){ rwl.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() + " be ready to write data!"); Thread.sleep((long)(Math.random()*1000)); this.data = data; System.out.println(Thread.currentThread().getName() + " have write data: " + data); } catch (InterruptedException e) { e.printStackTrace(); }finally{ rwl.writeLock().unlock(); } } }
public class TestConditon { public static void main(String[] args) { final Business business = new Business(); new Thread(new Runnable() { public void run() { for (int i = 1; i <= 50; i++) { business.sub(i); } } }).start(); for (int i = 1; i <= 50; i++) { business.main(i); } } } class Business { Lock lock = new ReentrantLock(); Condition notFull = lock.newCondition(); Condition notEmpty = lock.newCondition(); private boolean bShouldSub = true; public void sub(int i) { try { lock.lock(); while (!bShouldSub) { try { notFull.await(); } catch (InterruptedException e) { } } for (int j = 1; j <= 10; j++) { System.out.println("sub thread sequence of " + j + ",loop of " + i); } bShouldSub = false; notEmpty.signal(); } finally { lock.unlock(); } } public void main(int i) { try { lock.lock(); while (bShouldSub) { try { notEmpty.await(); } catch (InterruptedException e) { } } for (int j = 1; j <= 100; j++) { System.out.println("main thread sequence of " + j + ",loop of " + i); } bShouldSub = true; notFull.signal(); } finally { lock.unlock(); } } }
class Product { private int id; Product(int id) { this.id = id; } public String toString() { return "Product [id=" + id + "]"; } } class Box { Lock lock = new ReentrantLock(); Condition putConditon = lock.newCondition(); Condition popConditon = lock.newCondition(); Product[] p = new Product[10]; // 此处可以定义为一个队列 int index = 0; public void put(Product pro) { try { lock.lock(); while (index == p.length) { try { putConditon.await(); } catch (InterruptedException e) { } } p[index] = pro; index++; popConditon.signal(); } finally { lock.unlock(); } } public Product pop() { try { lock.lock(); while (index == 0) { try { popConditon.await(); } catch (InterruptedException e) { } } putConditon.signal(); index--; return p[index]; } finally { lock.unlock(); } } } class Producter implements Runnable { Box box = null; Producter(Box box) { this.box = box; } public void run() { for (int i = 0; i < 20; i++) { // 每个生产者生产20个 Product pro = new Product(i); box.put(pro); System.out.println(Thread.currentThread().getName() + "生产了:" + pro); } } } class Customer implements Runnable { Box box = null; Customer(Box box) { this.box = box; } public void run() { while (true) { Product pro = box.pop(); System.out.println(Thread.currentThread().getName() + "消费了:" + pro); } } } public class TestCondions { public static void main(String[] args) { Box box = new Box(); Producter p = new Producter(box); Customer c = new Customer(box); new Thread(p).start(); new Thread(p).start(); new Thread(p).start(); new Thread(c).start(); new Thread(c).start(); new Thread(c).start(); } }
public class SemaphoreTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final Semaphore sp = new Semaphore(3); for(int i=0;i<10;i++){ Runnable runnable = new Runnable(){ public void run(){ try { sp.acquire(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { //处理业务 Thread.sleep((long)(Math.random()*10000)); } catch (InterruptedException e) { e.printStackTrace(); } sp.release(); } }; service.execute(runnable); } } }