标签:
package com.paic.kafka.mq.consumer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.paic.util.PropertiesFileReader; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import org.springframework.stereotype.Component; @Component public class Consumer { private static ConsumerConnector consumer; private static String topic; private ExecutorService executor; private Integer numThreads = new Integer(1); public Consumer() { } static { consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); } public void shutdown() { if (consumer != null) consumer.shutdown(); if (executor != null) executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { System.out.println("Interrupted during shutdown, exiting uncleanly"); } } public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(topic, numThreads); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); executor = Executors.newFixedThreadPool(numThreads.intValue()); int threadNumber = 0; for (final KafkaStream stream : streams) { executor.submit(new ConsumerThread(stream, threadNumber)); threadNumber++; } } private static ConsumerConfig createConsumerConfig() { Properties props = new Properties(); String fileName = "com/tv189/interAction/mq/config/Consumer.properties"; props = PropertiesFileReader.initialize(fileName); Properties topicProps = new Properties(); String topicFileName = "com/tv189/interAction/mq/config/Topic.properties"; topicProps = PropertiesFileReader.initialize(topicFileName); topic = (String) topicProps.getProperty("UserWinningPraise"); return new ConsumerConfig(props); } } package com.paic.kafka.mq.consumer; import org.apache.log4j.Logger; /** * @Author: zkh * @Create Date: 2015-04-03 * @Purpose: * @Modified By: * @Modified Date: * @Why & What is modified * @Version: V1.0 * */ public class ConsumerInit { Consumer consumer; Logger logger = Logger.getLogger(ConsumerInit.class.getName()); public void consumerServicerStart() { logger.info("################consumerServicerStart################"); consumer.run(); logger.info("################consumerServicerEnd################"); } } package com.paic.kafka.mq.consumer; import org.apache.log4j.Logger; /** * @Author: zkh * @Create Date: 2015-04-03 * @Purpose: * @Modified By: * @Modified Date: * @Why & What is modified * @Version: V1.0 * */ public class ConsumerInit { Consumer consumer; Logger logger = Logger.getLogger(ConsumerInit.class.getName()); public void consumerServicerStart() { logger.info("################consumerServicerStart################"); consumer.run(); logger.info("################consumerServicerEnd################"); } } package com.paic.kafka.mq.message; /** * Created by zhangkh on 2016/6/14. */ public class Message { String msg; String name; public String getAge() { return age; } public void setAge(String age) { this.age = age; } String age; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } } package com.paic.kafka.mq.producer; import java.util.Properties; import org.apache.log4j.Logger; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; /** * @Author: zkh * @Create Date: 2015-04-02 * @Purpose: * @Modified By: * @Modified Date: * @Why & What is modified * @Version: V1.0 * */ public class Producer { Logger userWinningPraiseProducerLogger = Logger.getLogger(Producer.class.getName()); private static class SingletonHolder { private static Producer instance = new Producer(); } private kafka.javaapi.producer.Producer kafkaProducer; private String topic; public void producerInit() { Properties props = new Properties(); String fileName = "com/tv189/interAction/mq/config/Producer.properties"; props = PropertiesFileReader.initialize(fileName); ProducerConfig config = new ProducerConfig(props); kafkaProducer = new kafka.javaapi.producer.Producer(config); Properties topicProps = new Properties(); String topicFileName = "com/tv189/interAction/mq/config/Topic.properties"; topicProps = PropertiesFileReader.initialize(topicFileName); topic = (String) topicProps.getProperty("UserWinningPraise"); userWinningPraiseProducerLogger.info("*******************userWinningPraiseProducerLogger initialization success*******************"); } private Producer() { producerInit(); } public static Producer getInstance() { return SingletonHolder.instance; } public void push(String key, String message) { try { KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, message); kafkaProducer.send(data); userWinningPraiseProducerLogger.info("KafkaSend: " + message); } catch (Exception e) { e.printStackTrace(); } } } package com.paic.kafka.mq.producer; public class ProducerApi { public void userWinningPraiseSend(String key, String message) { Producer.getInstance().push(key,message); } } package com.paic.kafka.mq.producer; import java.io.IOException; import java.io.InputStream; import java.util.Properties; /** * @Author: zkh * @Create Date: 2015-04-02 * @Purpose:just for read properties file * @Modified By: * @Modified Date: * @Why & What is modified * @Version: V1.0 * */ public final class PropertiesFileReader { private PropertiesFileReader() { } public static Properties initialize(String filePath) { Properties prop = new Properties(); InputStream in = PropertiesFileReader.class.getClassLoader().getResourceAsStream(filePath); try { prop.load(in); } catch (IOException e) { e.printStackTrace(); } return prop; } } package com.paic.kafka.mq; import com.paic.kafka.mq.consumer.ConsumerInit; import com.paic.kafka.mq.producer.ProducerApi; /** * Created by zhangkh on 2016/6/14. */ public class MessageSendTest { public static void main(String[] args){ new ConsumerInit().consumerServicerStart(); ProducerApi producerApi=new ProducerApi(); producerApi.userWinningPraiseSend("key","message"); } } /***************************/ ##Warning:This configuration file can only write kafka consumer configuration #######################Consumer######################## # comma separated host:port pairs, each corresponding to a zk ##zookeeper.connect=172.31.10.3:2181,172.31.10.4:2181,172.31.10.5:2181 zookeeper.connect=kzk1:2181 # timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 # If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur. zookeeper.session.timeout.ms=6000 # How far a ZK follower can be behind a ZK leader zookeeper.sync.time.ms=2000 # consumer group id group.id=interAction_v2 # If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer. auto.commit.enable=true # The frequency in ms that the consumer offsets are committed to zookeeper. auto.commit.interval.ms=6000 # smallest : automatically reset the offset to the smallest offset auto.offset.reset=smallest # Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes. queued.max.message.chunks=5 # When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer. rebalance.max.retries=5 # The minimum amount of data the server should return for a fetch request # Under test fetch.min.bytes=5242880 # The maximum amount of time the server will block before answering the fetch request # if there isn‘t sufficient data to immediately satisfy fetch.min.bytes # Under test fetch.wait.max.ms=1000 /***************************/ /***************************/ # Warning:This configuration file can only write kafka producers configuration #######################Producer######################## ############################# Producer Basics ############################# # list of brokers used for bootstrapping knowledge about the rest of the cluster ##metadata.broker.list=172.31.10.3:9092,172.31.10.4:9092,172.31.10.5:9092 metadata.broker.list=kzk1:9091 # name of the partitioner class for partitioning events; default partition spreads data randomly # partitioner.class=example.producer.SimplePartitioner # specifies whether the messages are sent asynchronously (async) or synchronously (sync) producer.type=sync # specify the compression codec for all data generated: none, gzip, snappy, lz4. # the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively compression.codec=none # message encoder serializer.class=kafka.serializer.StringEncoder # The serializer class for keys (defaults to the same as for messages if nothing is given). key.serializer.class=kafka.serializer.StringEncoder # This value controls when a produce request is considered completed request.required.acks=1 # This property will cause the producer to automatically retry a failed send request. # This property specifies the number of retries when such failures occur. message.send.max.retries=3 # The producer generally refreshes the topic metadata from brokers when there is a failure topic.metadata.refresh.interval.ms=600000 /***************************/ ##dev environment,test environment,production environment UserWinningPraise=PRAISE
标签:
原文地址:http://www.cnblogs.com/molyeo/p/5585854.html