标签:
KAFKA安装和使用
WINDOWS:
准备软件:kafka_2.11-0.8.2.0.tgz
安装步骤:
1. 创建目录D:\kafka,将kafka_2.11-0.8.2.0.tgz解压到该目录下,解压两份,并重命名为kafka1和kafka2.并在这两个目录下创建文件kafka1/log/logs, kafka2/log/logs
2. 在D:\kafka目录下创建config文件夹,将KAFKA_HEME/config/log4j.properties复制到该目录下。
3. 将KAFKA_HEME/bin /windows下的所有文件复制到KAFKA_HEME/bin,对kafka1和kafka2做同样的操作。
4. 修改kafka1的 KAFKA_HEME/bin/kafka-run-class.bat,对kafka2做同样的修改
a. 修改set BASE_DIR=%CD% 为set BASE_DIR=D:\kafka\kafka1
b. 替换
rem Classpath addition forkafka-core dependencies
for %%i in(%BASE_DIR%\core\build\dependant-libs-%SCALA_VERSION%\*.jar) do (
call:concat %%i
)
rem Classpath addition forkafka-perf dependencies
for %%i in(%BASE_DIR%\perf\build\dependant-libs-%SCALA_VERSION%\*.jar) do (
call:concat %%i
)
rem Classpath addition forkafka-clients
for %%i in(%BASE_DIR%\clients\build\libs\kafka-clients-*.jar) do (
call:concat %%i
)
rem Classpath addition forkafka-examples
for %%i in(%BASE_DIR%\examples\build\libs\kafka-examples-*.jar) do (
call:concat %%i
)
rem Classpath addition forcontrib/hadoop-consumer
for %%i in(%BASE_DIR%\contrib\hadoop-consumer\build\libs\kafka-hadoop-consumer-*.jar) do(
call:concat %%i
)
rem Classpath addition forcontrib/hadoop-producer
for %%i in(%BASE_DIR%\contrib\hadoop-producer\build\libs\kafka-hadoop-producer-*.jar) do(
call:concat %%i
)
rem Classpath addition for release
for %%i in (%BASE_DIR%\libs\*.jar)do (
call:concat %%i
)
rem Classpath addition for core
for %%i in(%BASE_DIR%\core\build\libs\kafka_%SCALA_BINARY_VERSION%*.jar) do (
call:concat %%i
)
为:
for %%i in (%BASE_DIR%\libs\*.jar)do (
call :concat %%i
)
如果是jdk1.7 还需要删掉-XX:+UseCompressedOops参数 set
即修改KAFKA_JVM_PERFORMANCE_OPTS=-server-XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC-XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark-XX:+DisableExplicitGC -Djava.awt.headless=true
为: setKAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseParNewGC-XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled-XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true
5. 修改kafka1和kafka2的KAFKA_HEME /config/server.properties
Kafka1的server.properties,修改如下内容:
broker.id=1
port=9092
host.name=127.0.0.1
log.dirs=D:\kafka\kafka1\log\logs
zookeeper.connect=localhost:2181
kafka2的server.properties,修改如下内容:
broker.id=9094
port=9093
host.name=127.0.0.1
log.dirs=D:\kafka\kafka2\log\logs
zookeeper.connect=localhost:2181
6. 启动:
cd kafka_2.10-0.8.1.1\bin
kafka-server-start.bat ..\config\server.properties
Linux安装
准备: kafka_2.11-0.8.2.0.tgz
安装:
1. 创建/home/kakfa目录
mkdir –p /home/kakfa
touch /home/kafka/log/logs
2. 解压kafka_2.11-0.8.2.0.tgz 到/home/kakfa目录
3. 修改server.properties文件
#kafka broker标识,集群环境该值不可重复。
broker.id=1
#kafka集群端口号,consumer和producer将连接该端口
port=9092
#kafka日志文件存放路径
log.dirs=/home/kafka/log/logs
#zookeeper访问地址,多个地址用’,’隔开
zookeeper.connect=localhost:2181
对其他机器做同样的操作,行将KAFKA_HOME scp到其他机器,并修改server.properties的broker.id不要重复
4. 启动
./kafka-server-start.sh ..\config\server.properties
Java测试代码
producer.properties:
metadata.broker.list=localhost:9092,localhost:9093
zk.connect=localhost:2181
producer.type=sync
compression.codec=none
serializer.class=kafka.serializer.StringEncoder
LogProducer:
package com.apche.kafka.test;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class LogProducer {
privateProducer<String, String> inner;
publicLogProducer() throws Exception {
Propertiesproperties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));
// Propertiesproperties = new Properties();
// properties.put("zk.connect","127.0.0.1:2181");
// properties.put("metadata.broker.list","localhost:9092,localhost:9093");
// properties.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfigproducerConfig = new ProducerConfig(properties);
ProducerConfigconfig = new ProducerConfig(properties);
inner= new Producer<String, String>(config);
}
publicvoid send(String topicName, String message) {
if(topicName == null || message == null) {
return;
}
KeyedMessage<String,String> km = new KeyedMessage<String, String>(topicName, message);
//如果具有多个partitions,请使用newKeyedMessage(String topicName,Kkey,V value).
inner.send(km);
}
publicvoid send(String topicName, Collection<String> messages) {
if(topicName == null || messages == null) {
return;
}
if(messages.isEmpty()) {
return;
}
List<KeyedMessage<String,String>> kms = new ArrayList<KeyedMessage<String, String>>();
for(String entry : messages) {
KeyedMessage<String,String> km = new KeyedMessage<String, String>(
topicName,entry);
kms.add(km);
}
inner.send(kms);
}
publicvoid close() {
inner.close();
}
/**
* @param args
*/
publicstatic void main(String[] args) {
LogProducerproducer = null;
try{
producer= new LogProducer();
inti = 0;
while(true) {
producer.send("test-topic","this is a sample" + i);
System.out.println("sendthis is a sample"+i);
i++;
Thread.sleep(2000);
}
}catch (Exception e) {
e.printStackTrace();
}finally {
if(producer != null) {
producer.close();
}
}
}
}
consumer.properties
zookeeper.connect=127.0.0.1:2181
##,127.0.0.1:2182,127.0.0.1:2183
#timeoutinmsforconnectingtozookeeper
zookeeper.connectiontimeout.ms=1000000
#consumergroupid
group.id=test-group
#consumertimeout
#consumer.timeout.ms=5000
auto.commit.enable=false
auto.commit.interval.ms=60000
LogConsumer:
package com.apche.kafka.test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
importjava.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
importkafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
public class LogConsumer {
private ConsumerConfig config;
private String topic;
private int partitionsNum;
private MessageExecutor executor;
private ConsumerConnector connector;
private ExecutorService threadPool;
public LogConsumer(String topic,int partitionsNum,MessageExecutor executor)throws Exception{
Properties properties = new Properties();
properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));
config = new ConsumerConfig(properties);
this.topic = topic;
this.partitionsNum = partitionsNum;
this.executor = executor;
}
public void start() throws Exception{
connector = Consumer.createJavaConsumerConnector(config);
Map<String,Integer> topics = new HashMap<String,Integer>();
topics.put(topic, partitionsNum);
Map<String, List<KafkaStream<byte[], byte[]>>> streams= connector.createMessageStreams(topics);
List<KafkaStream<byte[], byte[]>> partitions =streams.get(topic);
threadPool = Executors.newFixedThreadPool(partitionsNum);
for(KafkaStream<byte[], byte[]> partition : partitions){
threadPool.execute(new MessageRunner(partition));
}
}
public void close(){
try{
threadPool.shutdownNow();
}catch(Exception e){
//
}finally{
connector.shutdown();
}
}
class MessageRunner implements Runnable{
private KafkaStream<byte[], byte[]> partition;
MessageRunner(KafkaStream<byte[], byte[]> partition) {
this.partition = partition;
}
public void run(){
ConsumerIterator<byte[], byte[]> it = partition.iterator();
while(it.hasNext()){
connector.commitOffsets();
//手动提交offset,当autocommit.enable=false时使用
MessageAndMetadata<byte[],byte[]> item = it.next();
System.out.println("partiton:"+ item.partition());
System.out.println("offset:" + item.offset());
executor.execute(newString(item.message()));//UTF-8,注意异常
}
}
}
interface MessageExecutor {
public void execute(String message);
}
/**
*@param args
*/
public static void main(String[] args) {
LogConsumer consumer = null;
try{
MessageExecutor executor = newMessageExecutor() {
public void execute(Stringmessage) {
System.out.println(message);
}
};
consumer = new LogConsumer("test-topic", 2, executor);
consumer.start();
}catch(Exception e){
e.printStackTrace();
}finally{
// if(consumer != null){
// consumer.close();
// }
}
}
}
标签:
原文地址:http://blog.csdn.net/mapengbo521521/article/details/43732377