码迷,mamicode.com
首页 > 其他好文 > 详细

生产者消费者模型——wait/notify/notifyAll使用

时间:2016-04-24 18:49:17      阅读:139      评论:0      收藏:0      [点我收藏+]

标签:

告警系统架构如下

1、 数据处理系统处理完原始数据并入库后,发送消息到kafka系统;

2、 告警生产者从kafka系统查询消息存入告警消息队列;

3、 告警消费者从告警消息队列查询消息进行处理。

这显然是生产者消费者模型,一个告警消息生产者,多个告警消息消费者。生产者生产消息过快会产生消息积压,生产者生产消息过慢不能充分利用硬件资源。所以必须要生产者和消费者协同处理,使得系统充分利用。具体做法是消息队列为空时,消费者通知生产者生产消息,生产者生产好消息后,通知消费者处理消息。Java中我们使用的对象锁以及wait/notify方法进行线程通信,原理如下:

生产者——循环(获取锁synchronized,释放锁wait(等待被唤醒),生产消息)

消费者——循环(获取锁,消息空则notify/消息不空则消费消息)

消息队列代码:

package com.coshaho.threadpool;

import java.util.ArrayList;
import java.util.List;

/**
 * 消息队列
 * @author coshaho
 */
public class MessageQueue 
{
	List<String> messageList = new ArrayList<String>();
	
	public boolean isEmpty()
	{
		return messageList.isEmpty();
	}
	
	/**
	 * 消费消息
	 * @return
	 */
	public String consumeMessage()
	{
		return messageList.remove(0);
	}
	
	/**
	 * 生产消息
	 * @param msg
	 */
	public void produceMessage(String msg)
	{
		messageList.add(msg);
	}
}

 消费者代码:

package com.coshaho.threadpool;

/**
 * 消费者,消息队列为空则唤醒生产者
 * @author h00219638
 *
 */
public class Customer implements Runnable
{
	String name;
	MessageQueue msgQueue;
	public Customer(String name, MessageQueue msgQueue)
	{
		this.name = name;
		this.msgQueue = msgQueue;
	}

	@Override
	public void run() 
	{
		while(true)
		{
			// 消费消息
			synchronized(msgQueue)
			{
				// 如果消息队列为空,唤醒生产者生产消息
				if(msgQueue.isEmpty())
				{
					msgQueue.notify();
				}
				// 消息队列不为空,则消费消息
				else
				{
					String msg = msgQueue.consumeMessage();
					System.out.println("Customer " + name + " consumed message: " + msg + ".");
				}
			}
			try 
			{
				Thread.sleep(1000);
			} 
			catch (InterruptedException e) 
			{
				e.printStackTrace();
			}
		}
	}
}

 生产者代码:

package com.coshaho.threadpool;

/**
 * 生产者,消息队列为空时,消费者会唤醒生产者生产消息
 * @author coshaho
 */
public class Producer implements Runnable
{
	String name;
	MessageQueue msgQueue;
	public Producer(String name, MessageQueue msgQueue)
	{
		this.name = name;
		this.msgQueue = msgQueue;
	}
	
	@Override
	public void run() 
	{
		int i = 0;
		while(true)
		{
			synchronized(msgQueue)
			{
				try 
				{
					// 释放msgQueue的锁,等待消费者唤醒
					msgQueue.wait();
				} 
				catch (InterruptedException e) 
				{
					e.printStackTrace();
				}
				// 唤醒后生产消息
				for(int j = 0; j < 5; j++)
				{
					msgQueue.produceMessage("message_" + i + "_" + j);
				}
			}
			System.out.println("Producer " + name + " produced 5 messages.");
		}
	}

}

 消息处理进程代码:

package com.coshaho.threadpool;

/**
 * 消息处理中心
 * @author coshaho
 */
public class MessageProcessCenter 
{
	public static void main(String[] args)
	{
		// 初始化消息队列
		MessageQueue msgQueue = new MessageQueue();
		// 运行1个生产者
		new Thread(new Producer("Producer", msgQueue)).start();
		
		// 运行3个消费者
		new Thread(new Customer("Customer1", msgQueue)).start();
		new Thread(new Customer("Customer2", msgQueue)).start();
		new Thread(new Customer("Customer3", msgQueue)).start();
	}
}

 运行结果:

Producer Producer produced 5 messages.
Customer Customer2 consumed message: message_0_0.
Customer Customer3 consumed message: message_0_1.
Customer Customer2 consumed message: message_0_2.
Customer Customer1 consumed message: message_0_3.
Customer Customer3 consumed message: message_0_4.
Producer Producer produced 5 messages.
Customer Customer3 consumed message: message_0_0.
Customer Customer1 consumed message: message_0_1.
Customer Customer2 consumed message: message_0_2.
Customer Customer1 consumed message: message_0_3.
Customer Customer3 consumed message: message_0_4.
Producer Producer produced 5 messages.
Customer Customer3 consumed message: message_0_0.
Customer Customer1 consumed message: message_0_1.
Customer Customer2 consumed message: message_0_2.
Customer Customer3 consumed message: message_0_3.
Customer Customer1 consumed message: message_0_4.

 

生产者消费者模型——wait/notify/notifyAll使用

标签:

原文地址:http://www.cnblogs.com/coshaho/p/5427517.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!