码迷,mamicode.com
首页 > 其他好文 > 详细

KAFKA安装和使用

时间:2015-02-11 14:43:39      阅读:688      评论:0      收藏:0      [点我收藏+]

标签:

KAFKA安装和使用

WINDOWS:

 

准备软件:kafka_2.11-0.8.2.0.tgz

安装步骤:

1.      创建目录D:\kafka,将kafka_2.11-0.8.2.0.tgz解压到该目录下,解压两份,并重命名为kafka1kafka2.并在这两个目录下创建文件kafka1/log/logs, kafka2/log/logs

2.      D:\kafka目录下创建config文件夹,将KAFKA_HEME/config/log4j.properties复制到该目录下。

3.      KAFKA_HEME/bin /windows下的所有文件复制到KAFKA_HEME/bin,对kafka1kafka2做同样的操作。

4.      修改kafka1 KAFKA_HEME/bin/kafka-run-class.bat,对kafka2做同样的修改

a.      修改set BASE_DIR=%CD%  set BASE_DIR=D:\kafka\kafka1

b.      替换

rem Classpath addition forkafka-core dependencies

for %%i in(%BASE_DIR%\core\build\dependant-libs-%SCALA_VERSION%\*.jar) do (

   call:concat %%i

)

 

rem Classpath addition forkafka-perf dependencies

for %%i in(%BASE_DIR%\perf\build\dependant-libs-%SCALA_VERSION%\*.jar) do (

   call:concat %%i

)

 

rem Classpath addition forkafka-clients

for %%i in(%BASE_DIR%\clients\build\libs\kafka-clients-*.jar) do (

   call:concat %%i

)

 

rem Classpath addition forkafka-examples

for %%i in(%BASE_DIR%\examples\build\libs\kafka-examples-*.jar) do (

   call:concat %%i

)

 

rem Classpath addition forcontrib/hadoop-consumer

for %%i in(%BASE_DIR%\contrib\hadoop-consumer\build\libs\kafka-hadoop-consumer-*.jar) do(

   call:concat %%i

)

 

rem Classpath addition forcontrib/hadoop-producer

for %%i in(%BASE_DIR%\contrib\hadoop-producer\build\libs\kafka-hadoop-producer-*.jar) do(

   call:concat %%i

)

 

rem Classpath addition for release

for %%i in (%BASE_DIR%\libs\*.jar)do (

   call:concat %%i

)

 

rem Classpath addition for core

for %%i in(%BASE_DIR%\core\build\libs\kafka_%SCALA_BINARY_VERSION%*.jar) do (

   call:concat %%i

)

为:

for %%i in (%BASE_DIR%\libs\*.jar)do (

     call :concat %%i

)

 

如果是jdk1.7 还需要删掉-XX:+UseCompressedOops参数  set

即修改KAFKA_JVM_PERFORMANCE_OPTS=-server-XX:+UseCompressedOops -XX:+UseParNewGC -XX:+UseConcMarkSweepGC-XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark-XX:+DisableExplicitGC -Djava.awt.headless=true

为:   setKAFKA_JVM_PERFORMANCE_OPTS=-server  -XX:+UseParNewGC-XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled-XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC -Djava.awt.headless=true

 

5.      修改kafka1kafka2KAFKA_HEME /config/server.properties

Kafka1server.properties,修改如下内容:

broker.id=1

port=9092

host.name=127.0.0.1

log.dirs=D:\kafka\kafka1\log\logs

zookeeper.connect=localhost:2181

kafka2server.properties,修改如下内容:

broker.id=9094

port=9093

host.name=127.0.0.1

log.dirs=D:\kafka\kafka2\log\logs

      zookeeper.connect=localhost:2181

 

 

6.      启动:

      cd kafka_2.10-0.8.1.1\bin

           kafka-server-start.bat  ..\config\server.properties

 

Linux安装

准备: kafka_2.11-0.8.2.0.tgz

安装:

1.      创建/home/kakfa目录

mkdir –p /home/kakfa

touch /home/kafka/log/logs

2.      解压kafka_2.11-0.8.2.0.tgz /home/kakfa目录

3.      修改server.properties文件

#kafka broker标识,集群环境该值不可重复。

broker.id=1

#kafka集群端口号,consumerproducer将连接该端口

port=9092

#kafka日志文件存放路径

log.dirs=/home/kafka/log/logs

#zookeeper访问地址,多个地址用’,’隔开

zookeeper.connect=localhost:2181

 

对其他机器做同样的操作,行将KAFKA_HOME scp到其他机器,并修改server.propertiesbroker.id不要重复

 

4.      启动

   ./kafka-server-start.sh  ..\config\server.properties

 

Java测试代码

 

producer.properties

 

metadata.broker.list=localhost:9092,localhost:9093

zk.connect=localhost:2181

producer.type=sync

compression.codec=none

serializer.class=kafka.serializer.StringEncoder

 

LogProducer

 

package com.apche.kafka.test;

 

import java.util.ArrayList;

import java.util.Collection;

import java.util.List;

import java.util.Properties;

 

import kafka.javaapi.producer.Producer;

import kafka.producer.KeyedMessage;

import kafka.producer.ProducerConfig;

 

public class LogProducer {

 

         privateProducer<String, String> inner;

 

         publicLogProducer() throws Exception {

                   Propertiesproperties = new Properties();

                   properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));

                  

//               Propertiesproperties = new Properties(); 

//               properties.put("zk.connect","127.0.0.1:2181"); 

//               properties.put("metadata.broker.list","localhost:9092,localhost:9093");  

//               properties.put("serializer.class","kafka.serializer.StringEncoder");  

                   ProducerConfigproducerConfig = new ProducerConfig(properties);

                   ProducerConfigconfig = new ProducerConfig(properties);

                   inner= new Producer<String, String>(config);

         }

 

         publicvoid send(String topicName, String message) {

                   if(topicName == null || message == null) {

                            return;

                   }

                   KeyedMessage<String,String> km = new KeyedMessage<String, String>(topicName, message);

                   //如果具有多个partitions,请使用newKeyedMessage(String topicName,Kkey,V value).

                   inner.send(km);

         }

 

         publicvoid send(String topicName, Collection<String> messages) {

                   if(topicName == null || messages == null) {

                            return;

                   }

                   if(messages.isEmpty()) {

                            return;

                   }

                   List<KeyedMessage<String,String>> kms = new ArrayList<KeyedMessage<String, String>>();

                   for(String entry : messages) {

                            KeyedMessage<String,String> km = new KeyedMessage<String, String>(

                                               topicName,entry);

                            kms.add(km);

                   }

                   inner.send(kms);

         }

 

         publicvoid close() {

                   inner.close();

         }

 

         /**

          * @param args

          */

         publicstatic void main(String[] args) {

                   LogProducerproducer = null;

                   try{

                            producer= new LogProducer();

                            inti = 0;

                            while(true) {

                                     producer.send("test-topic","this is a sample" + i);

                                     System.out.println("sendthis is a sample"+i);

                                     i++;

                                     Thread.sleep(2000);

                            }

                   }catch (Exception e) {

                            e.printStackTrace();

                   }finally {

                            if(producer != null) {

                                     producer.close();

                            }

                   }

 

         }

 

}

 

consumer.properties

 

zookeeper.connect=127.0.0.1:2181

##,127.0.0.1:2182,127.0.0.1:2183

#timeoutinmsforconnectingtozookeeper

zookeeper.connectiontimeout.ms=1000000

#consumergroupid

group.id=test-group

#consumertimeout

#consumer.timeout.ms=5000

auto.commit.enable=false

auto.commit.interval.ms=60000

 

LogConsumer

 

package com.apche.kafka.test;

 

 

import java.util.HashMap; 

import java.util.List; 

import java.util.Map; 

import java.util.Properties; 

importjava.util.concurrent.ExecutorService; 

import java.util.concurrent.Executors; 

 

import kafka.consumer.Consumer; 

import kafka.consumer.ConsumerConfig; 

import kafka.consumer.ConsumerIterator; 

import kafka.consumer.KafkaStream; 

importkafka.javaapi.consumer.ConsumerConnector; 

import kafka.message.MessageAndMetadata; 

public class LogConsumer { 

 

   private ConsumerConfig config; 

   private String topic; 

   private int partitionsNum; 

   private MessageExecutor executor; 

   private ConsumerConnector connector; 

   private ExecutorService threadPool; 

   public LogConsumer(String topic,int partitionsNum,MessageExecutor executor)throws Exception{ 

       Properties properties = new Properties(); 

       properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties")); 

       config = new ConsumerConfig(properties); 

       this.topic = topic; 

        this.partitionsNum = partitionsNum; 

       this.executor = executor; 

   } 

     

   public void start() throws Exception{ 

       connector = Consumer.createJavaConsumerConnector(config); 

       Map<String,Integer> topics = new HashMap<String,Integer>(); 

       topics.put(topic, partitionsNum); 

       Map<String, List<KafkaStream<byte[], byte[]>>> streams= connector.createMessageStreams(topics); 

       List<KafkaStream<byte[], byte[]>> partitions =streams.get(topic); 

       threadPool = Executors.newFixedThreadPool(partitionsNum); 

       for(KafkaStream<byte[], byte[]> partition : partitions){ 

           threadPool.execute(new MessageRunner(partition)); 

       }  

   } 

 

         

   public void close(){ 

       try{ 

           threadPool.shutdownNow(); 

       }catch(Exception e){ 

           // 

       }finally{ 

           connector.shutdown(); 

       } 

         

   } 

     

   class MessageRunner implements Runnable{ 

       private KafkaStream<byte[], byte[]> partition; 

         

       MessageRunner(KafkaStream<byte[], byte[]> partition) { 

           this.partition = partition; 

       } 

         

       public void run(){ 

           ConsumerIterator<byte[], byte[]> it = partition.iterator(); 

           while(it.hasNext()){ 

                connector.commitOffsets();

                //手动提交offset,autocommit.enable=false时使用 

               MessageAndMetadata<byte[],byte[]> item = it.next(); 

                System.out.println("partiton:"+ item.partition()); 

               System.out.println("offset:" + item.offset()); 

                executor.execute(newString(item.message()));//UTF-8,注意异常 

           } 

       } 

   } 

     

   interface MessageExecutor { 

         

       public void execute(String message); 

   } 

     

    /**

     *@param args

    */ 

   public static void main(String[] args) { 

       LogConsumer consumer = null; 

       try{ 

           MessageExecutor executor = newMessageExecutor() { 

                 

                public void execute(Stringmessage) { 

                   System.out.println(message); 

                     

                } 

           }; 

           consumer = new LogConsumer("test-topic", 2, executor); 

           consumer.start(); 

       }catch(Exception e){ 

           e.printStackTrace(); 

       }finally{ 

//         if(consumer != null){ 

 //             consumer.close(); 

 //         } 

        } 

  

    } 

  

 } 

 

KAFKA安装和使用

标签:

原文地址:http://blog.csdn.net/mapengbo521521/article/details/43732377

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!