标签:
package com.paic.kafka.mq.consumer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import com.paic.util.PropertiesFileReader;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
private static ConsumerConnector consumer;
private static String topic;
private ExecutorService executor;
private Integer numThreads = new Integer(1);
public Consumer() {
}
static {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
}
public void shutdown() {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly");
}
}
public void run() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, numThreads);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(numThreads.intValue());
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerThread(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
String fileName = "com/tv189/interAction/mq/config/Consumer.properties";
props = PropertiesFileReader.initialize(fileName);
Properties topicProps = new Properties();
String topicFileName = "com/tv189/interAction/mq/config/Topic.properties";
topicProps = PropertiesFileReader.initialize(topicFileName);
topic = (String) topicProps.getProperty("UserWinningPraise");
return new ConsumerConfig(props);
}
}
package com.paic.kafka.mq.consumer;
import org.apache.log4j.Logger;
/**
* @Author: zkh
* @Create Date: 2015-04-03
* @Purpose:
* @Modified By:
* @Modified Date:
* @Why & What is modified
* @Version: V1.0
*
*/
public class ConsumerInit {
Consumer consumer;
Logger logger = Logger.getLogger(ConsumerInit.class.getName());
public void consumerServicerStart() {
logger.info("################consumerServicerStart################");
consumer.run();
logger.info("################consumerServicerEnd################");
}
}
package com.paic.kafka.mq.consumer;
import org.apache.log4j.Logger;
/**
* @Author: zkh
* @Create Date: 2015-04-03
* @Purpose:
* @Modified By:
* @Modified Date:
* @Why & What is modified
* @Version: V1.0
*
*/
public class ConsumerInit {
Consumer consumer;
Logger logger = Logger.getLogger(ConsumerInit.class.getName());
public void consumerServicerStart() {
logger.info("################consumerServicerStart################");
consumer.run();
logger.info("################consumerServicerEnd################");
}
}
package com.paic.kafka.mq.message;
/**
* Created by zhangkh on 2016/6/14.
*/
public class Message {
String msg;
String name;
public String getAge() {
return age;
}
public void setAge(String age) {
this.age = age;
}
String age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}
package com.paic.kafka.mq.producer;
import java.util.Properties;
import org.apache.log4j.Logger;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* @Author: zkh
* @Create Date: 2015-04-02
* @Purpose:
* @Modified By:
* @Modified Date:
* @Why & What is modified
* @Version: V1.0
*
*/
public class Producer {
Logger userWinningPraiseProducerLogger = Logger.getLogger(Producer.class.getName());
private static class SingletonHolder {
private static Producer instance = new Producer();
}
private kafka.javaapi.producer.Producer kafkaProducer;
private String topic;
public void producerInit() {
Properties props = new Properties();
String fileName = "com/tv189/interAction/mq/config/Producer.properties";
props = PropertiesFileReader.initialize(fileName);
ProducerConfig config = new ProducerConfig(props);
kafkaProducer = new kafka.javaapi.producer.Producer(config);
Properties topicProps = new Properties();
String topicFileName = "com/tv189/interAction/mq/config/Topic.properties";
topicProps = PropertiesFileReader.initialize(topicFileName);
topic = (String) topicProps.getProperty("UserWinningPraise");
userWinningPraiseProducerLogger.info("*******************userWinningPraiseProducerLogger initialization success*******************");
}
private Producer() {
producerInit();
}
public static Producer getInstance() {
return SingletonHolder.instance;
}
public void push(String key, String message) {
try {
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, key, message);
kafkaProducer.send(data);
userWinningPraiseProducerLogger.info("KafkaSend: " + message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
package com.paic.kafka.mq.producer;
public class ProducerApi {
public void userWinningPraiseSend(String key, String message) {
Producer.getInstance().push(key,message);
}
}
package com.paic.kafka.mq.producer;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* @Author: zkh
* @Create Date: 2015-04-02
* @Purpose:just for read properties file
* @Modified By:
* @Modified Date:
* @Why & What is modified
* @Version: V1.0
*
*/
public final class PropertiesFileReader {
private PropertiesFileReader() {
}
public static Properties initialize(String filePath) {
Properties prop = new Properties();
InputStream in = PropertiesFileReader.class.getClassLoader().getResourceAsStream(filePath);
try {
prop.load(in);
} catch (IOException e) {
e.printStackTrace();
}
return prop;
}
}
package com.paic.kafka.mq;
import com.paic.kafka.mq.consumer.ConsumerInit;
import com.paic.kafka.mq.producer.ProducerApi;
/**
* Created by zhangkh on 2016/6/14.
*/
public class MessageSendTest {
public static void main(String[] args){
new ConsumerInit().consumerServicerStart();
ProducerApi producerApi=new ProducerApi();
producerApi.userWinningPraiseSend("key","message");
}
}
/***************************/
##Warning:This configuration file can only write kafka consumer configuration
#######################Consumer########################
# comma separated host:port pairs, each corresponding to a zk
##zookeeper.connect=172.31.10.3:2181,172.31.10.4:2181,172.31.10.5:2181
zookeeper.connect=kzk1:2181
# timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
# If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur.
zookeeper.session.timeout.ms=6000
# How far a ZK follower can be behind a ZK leader
zookeeper.sync.time.ms=2000
# consumer group id
group.id=interAction_v2
# If true, periodically commit to ZooKeeper the offset of messages already fetched by the consumer.
auto.commit.enable=true
# The frequency in ms that the consumer offsets are committed to zookeeper.
auto.commit.interval.ms=6000
# smallest : automatically reset the offset to the smallest offset
auto.offset.reset=smallest
# Max number of message chunks buffered for consumption. Each chunk can be up to fetch.message.max.bytes.
queued.max.message.chunks=5
# When a new consumer joins a consumer group the set of consumers attempt to "rebalance" the load to assign partitions to each consumer.
rebalance.max.retries=5
# The minimum amount of data the server should return for a fetch request
# Under test
fetch.min.bytes=5242880
# The maximum amount of time the server will block before answering the fetch request
# if there isn‘t sufficient data to immediately satisfy fetch.min.bytes
# Under test
fetch.wait.max.ms=1000
/***************************/
/***************************/
# Warning:This configuration file can only write kafka producers configuration
#######################Producer########################
############################# Producer Basics #############################
# list of brokers used for bootstrapping knowledge about the rest of the cluster
##metadata.broker.list=172.31.10.3:9092,172.31.10.4:9092,172.31.10.5:9092
metadata.broker.list=kzk1:9091
# name of the partitioner class for partitioning events; default partition spreads data randomly
# partitioner.class=example.producer.SimplePartitioner
# specifies whether the messages are sent asynchronously (async) or synchronously (sync)
producer.type=sync
# specify the compression codec for all data generated: none, gzip, snappy, lz4.
# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively
compression.codec=none
# message encoder
serializer.class=kafka.serializer.StringEncoder
# The serializer class for keys (defaults to the same as for messages if nothing is given).
key.serializer.class=kafka.serializer.StringEncoder
# This value controls when a produce request is considered completed
request.required.acks=1
# This property will cause the producer to automatically retry a failed send request.
# This property specifies the number of retries when such failures occur.
message.send.max.retries=3
# The producer generally refreshes the topic metadata from brokers when there is a failure
topic.metadata.refresh.interval.ms=600000
/***************************/
##dev environment,test environment,production environment
UserWinningPraise=PRAISE
标签:
原文地址:http://www.cnblogs.com/molyeo/p/5585854.html