码迷,mamicode.com
首页 > 其他好文 > 详细

kafka demo

时间:2016-06-14 23:47:07      阅读:244      评论:0      收藏:0      [点我收藏+]

标签:

package com.paic.kafka.mq.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.paic.util.PropertiesFileReader;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.springframework.stereotype.Component;


@Component
public class Consumer {
	private static ConsumerConnector consumer;
	private static String topic;
	private ExecutorService executor;
	private Integer numThreads = new Integer(1);

	public Consumer() {
		
	}
	static {
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
	}
	public void shutdown() {
		if (consumer != null)
			consumer.shutdown();
		if (executor != null)
			executor.shutdown();
		try {
			if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
				System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
			}
		} catch (InterruptedException e) {
			System.out.println("Interrupted during shutdown, exiting uncleanly");
		}
	}

	public void run() {
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, numThreads);
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
		executor = Executors.newFixedThreadPool(numThreads.intValue());
		int threadNumber = 0;
		for (final KafkaStream stream : streams) {
			executor.submit(new ConsumerThread(stream, threadNumber));
			threadNumber++;
		}
	}

	private static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		String fileName = "com/tv189/interAction/mq/config/Consumer.properties";
		props = PropertiesFileReader.initialize(fileName);

		Properties topicProps = new Properties();
		String topicFileName = "com/tv189/interAction/mq/config/Topic.properties";
		topicProps = PropertiesFileReader.initialize(topicFileName);
		topic = (String) topicProps.getProperty("UserWinningPraise");
		return new ConsumerConfig(props);

	}

}

package com.paic.kafka.mq.consumer;

import org.apache.log4j.Logger;

/**
 * @Author: zkh
 * @Create Date: 2015-04-03
 * @Purpose:
 * @Modified By:
 * @Modified Date:
 * @Why & What is modified
 * @Version: V1.0
 *
 */

public class ConsumerInit {
	Consumer consumer;

	Logger logger = Logger.getLogger(ConsumerInit.class.getName());
	public void consumerServicerStart() {
		logger.info("################consumerServicerStart################");
		consumer.run();
		logger.info("################consumerServicerEnd################");
	}
}


package com.paic.kafka.mq.consumer;

import org.apache.log4j.Logger;

/**
 * @Author: zkh
 * @Create Date: 2015-04-03
 * @Purpose:
 * @Modified By:
 * @Modified Date:
 * @Why & What is modified
 * @Version: V1.0
 *
 */

public class ConsumerInit {
	Consumer consumer;

	Logger logger = Logger.getLogger(ConsumerInit.class.getName());
	public void consumerServicerStart() {
		logger.info("################consumerServicerStart################");
		consumer.run();
		logger.info("################consumerServicerEnd################");
	}
}


package com.paic.kafka.mq.message;

/**
 * Created by zhangkh on 2016/6/14.
 */
public class Message {
    String msg;
    String name;

    public String getAge() {
        return age;
    }

    public void setAge(String age) {
        this.age = age;
    }

    String age;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMsg() {
        return msg;

    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}


package com.paic.kafka.mq.producer;

import java.util.Properties;

import org.apache.log4j.Logger;

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * @Author: zkh
 * @Create Date: 2015-04-02
 * @Purpose:
 * @Modified By:
 * @Modified Date:
 * @Why & What is modified
 * @Version: V1.0
 *
 */
public class Producer  {
	Logger userWinningPraiseProducerLogger = Logger.getLogger(Producer.class.getName());

	private static class SingletonHolder {
		private static Producer instance = new Producer();
	}

	private kafka.javaapi.producer.Producer kafkaProducer;
	private String topic;

	public void producerInit() {
		Properties props = new Properties();
		String fileName = "com/tv189/interAction/mq/config/Producer.properties";
		props = PropertiesFileReader.initialize(fileName);
		ProducerConfig config = new ProducerConfig(props);
		kafkaProducer = new kafka.javaapi.producer.Producer(config);

		Properties topicProps = new Properties();
		String topicFileName = "com/tv189/interAction/mq/config/Topic.properties";
		topicProps = PropertiesFileReader.initialize(topicFileName);
		topic = (String) topicProps.getProperty("UserWinningPraise");
		userWinningPraiseProducerLogger.info("*******************userWinningPraiseProducerLogger initialization success*******************");

	}

	private Producer() {
		producerInit();

	}

	public static Producer getInstance() {
		return SingletonHolder.instance;
	}

	public void push(String key, String message) {
		try {
			KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, message);
			kafkaProducer.send(data);
			userWinningPraiseProducerLogger.info("KafkaSend: " + message);
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

}


package com.paic.kafka.mq.producer;
public class ProducerApi {

	public  void userWinningPraiseSend(String key, String message) {
		Producer.getInstance().push(key,message);
	}
}


package com.paic.kafka.mq.producer;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

/**
 * @Author: zkh
 * @Create Date: 2015-04-02
 * @Purpose:just for read properties file
 * @Modified By:
 * @Modified Date:
 * @Why & What is modified
 * @Version: V1.0
 *
 */
public final class PropertiesFileReader {

	private PropertiesFileReader() {

	}

	public static Properties initialize(String filePath) {
		Properties prop = new Properties();
		InputStream in = PropertiesFileReader.class.getClassLoader().getResourceAsStream(filePath);
		try {
			prop.load(in);
		} catch (IOException e) {
			e.printStackTrace();
		}
		return prop;
	}
}
package com.paic.kafka.mq;

import com.paic.kafka.mq.consumer.ConsumerInit;
import com.paic.kafka.mq.producer.ProducerApi;

/**
 * Created by zhangkh on 2016/6/14.
 */
public class MessageSendTest {
    public static void main(String[] args){
        new ConsumerInit().consumerServicerStart();
        ProducerApi producerApi=new ProducerApi();
        producerApi.userWinningPraiseSend("key","message");
    }
}



/***************************/



##Warning:This configuration file can only write kafka consumer configuration 

#######################Consumer########################
# comma separated host:port pairs, each corresponding to a zk
##zookeeper.connect=172.31.10.3:2181,172.31.10.4:2181,172.31.10.5:2181
zookeeper.connect=kzk1:2181

# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

# If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
zookeeper.session.timeout.ms=6000

# How far a ZK follower can be behind a ZK leader
zookeeper.sync.time.ms=2000

# consumer group id
group.id=interAction_v2

# If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
auto.commit.enable=true

# The frequency in ms that the consumer offsets are committed to zookeeper.
auto.commit.interval.ms=6000

# smallest : automatically reset the offset to the smallest offset
auto.offset.reset=smallest

# Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes.
queued.max.message.chunks=5

# When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.
rebalance.max.retries=5

# The minimum amount of data the server should return for a fetch request
# Under test
fetch.min.bytes=5242880
# The maximum amount of time the server will block before answering the fetch request 
# if there isn‘t sufficient data to immediately satisfy fetch.min.bytes
# Under test
fetch.wait.max.ms=1000



/***************************/



/***************************/
# Warning:This configuration file can only write kafka producers configuration 

#######################Producer########################

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster
##metadata.broker.list=172.31.10.3:9092,172.31.10.4:9092,172.31.10.5:9092
metadata.broker.list=kzk1:9091
# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=example.producer.SimplePartitioner

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync

# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none

# message encoder
serializer.class=kafka.serializer.StringEncoder

# The serializer class for keys (defaults to the same as for messages if nothing is given).
key.serializer.class=kafka.serializer.StringEncoder
 	

# This value controls when a produce request is considered completed
request.required.acks=1

# This property will cause the producer to automatically retry a failed send request. 
# This property specifies the number of retries when such failures occur.
message.send.max.retries=3

# The producer generally refreshes the topic metadata from brokers when there is a failure
topic.metadata.refresh.interval.ms=600000

/***************************/


##dev environment,test environment,production environment 
UserWinningPraise=PRAISE

 

kafka demo

标签:

原文地址:http://www.cnblogs.com/molyeo/p/5585854.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!