标签:
下载地址:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz
lizhitao@localhost:~$ tar -xzf kafka_2.10-0.8.1.1.tgz
lizhitao@localhost:~$ cd kafka_2.10-0.8.1.1.tgz
配置zookeeper(假设您已经安装了zookeeper,如果没有安装,请再网上搜索安装方法)
进入kafka安装工程根目录编辑 vim config/server.properties
修改属性zookeeper.connect=ip:2181,ip2: 2181
kafka最为重要三个配置依次为:broker.id、log.dir、zookeeper.connect
kafka server端config/server.properties参数说明和解释如下:
(参考配置说明地址:http://blog.csdn.net/lizhitao/article/details/25667831)
#实际使用案例 这里211上面的kafka 配置文件
broker.id=1 port=9092 host.name=192.168.1.211 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=2 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=60000 log.cleaner.enable=false zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181 zookeeper.connection.timeout.ms=1000000 #kafka实际使用案例 210服务器kafka配置 broker.id=2 port=9092 host.name=192.168.1.210 num.network.threads=2 num.io.threads=8 socket.send.buffer.bytes=1048576 socket.receive.buffer.bytes=1048576 socket.request.max.bytes=104857600 log.dirs=/tmp/kafka-logs num.partitions=2 log.retention.hours=168 log.segment.bytes=536870912 log.retention.check.interval.ms=60000 log.cleaner.enable=false zookeeper.connect=192.168.1.213:2181,192.168.1.216:2181,192.168.1.217:2181 zookeeper.connection.timeout.ms=1000000
cd kafka-0.8.1
lizhitao@localhost:~$ bin/kafka-server-start.sh -daemon config/server.properties &
(实验时,需要启动至少两个broker bin/kafka-server-start.sh -daemon config/server-1.properties &)
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
lizhitao@localhost:~$ bin/kafka-topics.sh --list --zookeeper localhost:2181
localhost为zookeeper地址
topic描述:
bin/kafka-topics.sh --describe --zookeeper 192.168.1.8:2181 --topic test
发送一些消息验证,在console模式下,启动producer
bin/kafka-console-producer.sh --broker-list 192.168.1.9:9092 --topic zjcTest
(此处localhost改为本机ip,否则报错,I don’t know why)
消息:
{"price":"100000","userId":14615501351480021,"payType":3,"code":"AFD3B8","payTime":{"time":1457330791333,"minutes":6,"seconds":31,"hours":14,"month":2,"year":116,"timezoneOffset":-480,"day":1,"date":7},"orderId":12222096,"goodsName":"高中半年会员"}
步骤8:启动一个consumer
lizhitao@localhost:~$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
bin/kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic test666 --zookeeper 192.168.197.170:2181 ,192.168.197.171:2181
和单机环境一样,只是需要修改下broker 的配置文件而已。
1、将单机版的kafka 目录复制到其他几台电脑上。
2、修改每台电脑上的kafka 目录下的server.properties 文件。
broker.id=1//这个参数在kafka 的broker 集群中必须唯一,且为正整数。
3、启动每台电脑上的kafka 即可。
本机配置伪分布式
首先为每个节点编写配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
在拷贝出的新文件中添加以下参数:
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
现在启动另外两个节点:
> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
创建一个拥有3个副本的topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
运行“"describe topics”命令知道每个节点的信息
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.
<!-- kafka配置 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>${kafka.version}</version>
<exclusions>
<!-- 实际应用中单独引入下面的jar包,不使用kafka带的 -->
<exclusion>
<artifactId>zookeeper</artifactId>
<groupId>org.apache.zookeeper</groupId>
</exclusion>
<exclusion>
<artifactId>zkclient</artifactId>
<groupId>com.101tec</groupId>
</exclusion>
<exclusion>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
#zookeeper.connect=192.168.1.8:2181,192.168.1.13:2181,192.168.1.16:2181
#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181
zookeeper.connect=
192.168.1.179:2181metadata.broker.list=192.168.1.179:9092
#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092
#zookeeper.connect.timeout=15000
#zookeeper.session.timeout.ms=15000
#zookeeper.sync.time.ms=20000
#auto.commit.interval.ms=20000
#auto.offset.reset=smallest
#serializer.class=kafka.serializer.StringEncoder
#producer.type=async
#queue.buffering.max.ms=6000
group.id=llx
kafka.sellstat.topics=
llx<!-- 这个是加载给spring 用的.-->
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<list>
<value>classpath:kafka.properties</value>
</list>
</property>
</bean>
<!-- 这个是用来在代码中注入用的.-->
<bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="locations">
<list>
<value>classpath:kafka.properties</value>
</list>
</property>
</bean>
<!-- 定义收信人 receiver -->
<bean id="testReceiver" class="cn.vko.index.Receiver">
<constructor-arg index="0" value="${zookeeper.connect}" />
<constructor-arg index="1" value="${group.id}" />
<constructor-arg index="2" value="${kafka.sellstat.topics}"/>
<constructor-arg index="3" ref="testConsumer" />
</bean>
<!-- 定义消息处理器 -->
<bean id="testConsumer" class="cn.vko.index.TestConsumer" ></bean>
package cn.vko.index;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import cn.vko.common.base.JsonMsg;
/**
* 测试kafka发送消息
* @author lilixin
*
*/
@Controller
public class TestProducer {
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Value("#{configProperties[‘metadata.broker.list‘]}")
private String metadataBrokerList;
@Value("#{configProperties[‘kafka.sellstat.topics‘]}")
private String topic;
@ResponseBody
@RequestMapping("send")
public JsonMsg send(String msg){
logger.info("发送开始-------------------------");
VkoProducer vkoProducer =new VkoProducer(metadataBrokerList);
logger.info("连接完成-------------------------");
vkoProducer.send(topic, msg);
logger.info("发送完成-------------------------");
return new JsonMsg();
}
}
package cn.vko.index;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
/**
* 测试kafka接收消息
* @author llx
*
*/
@Service
public class TestConsumer implements VkoConsumer{
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void dealMsg(String msg) {
logger.info("--------kafka接收消息开始---------");
logger.info(msg);
logger.info("--------kafka接收消息结束 ---------");
}
}
package cn.vko.index;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.vko.common.utils.mybatis.GenCreateInterceptor;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class VkoProducer {
private Logger log = LoggerFactory.getLogger(VkoProducer.class);
private String metadataBrokerList;
private Producer<String, String> producer;
public VkoProducer(String metadataBrokerList) {
super();
if(StringUtils.isEmpty(metadataBrokerList)){
String message = "metadataBrokerList 不可以为空";
// log.error(message);
throw new RuntimeException(message);
}
this.metadataBrokerList = metadataBrokerList;
// 设置配置属性
Properties props = new Properties();
props.put("metadata.broker.list", metadataBrokerList);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
//props.put("producer.type", "async");
props.put("queue.buffering.max.ms", "5000");
props.put("queue.buffering.max.messages", "30000");
props.put("queue.enqueue.timeout.ms", "-1");
props.put("batch.num.messages", "1");
// 可选配置,如果不配置,则使用默认的partitioner
//props.put("partitioner.class", "cn.vko.kafka.PartitionerDemo");
// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
// 值为0,1,-1,可以参考
// http://kafka.apache.org/08/configuration.html
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
}
/**
* 单条插入队列
* @param msg
* @param topic 主题
* @return
*/
public String send(String topic, String msg) {
log.info("向topic : "+topic + " 发送消息 ="+msg);
// Long start = System.currentTimeMillis();
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, msg);
producer.send(data);
// log.info("发送消息耗时:{}",System.currentTimeMillis()- start);
return "ok";
}
}
package cn.vko.index;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import cn.vko.common.utils.mybatis.GenCreateInterceptor;
import cn.vko.component.pageframework.util.StringUtil;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class Receiver {
private Logger log = LoggerFactory.getLogger(Receiver.class);
private String zookeeperConnect;
private String groupId;
private String topic;
private VkoConsumer vkoConsumer;
/**
* 创建收件人
* @param zookeeperConnect zk集群地址,逗号分隔
* @param groupId 组id
* @param topic 主题
* @param vkoConsumer 处理器
*/
public Receiver(String zookeeperConnect, String groupId, String topic,VkoConsumer vkoConsumer) {
super();
if(StringUtil.isEmpty(zookeeperConnect)){
String message = "zookeeperConnect 不可以为空";
log.error(message);
throw new RuntimeException(message);
}
if(StringUtil.isEmpty(groupId)){
String message = "groupId 不可以为空";
log.error(message);
throw new RuntimeException(message);
}
if(StringUtil.isEmpty(topic)){
String message = "topic 不可以为空";
log.error(message);
throw new RuntimeException(message);
}
if(vkoConsumer == null){
String message = "vkoConsumer 不可以为空";
log.error(message);
throw new RuntimeException(message);
}
this.zookeeperConnect = zookeeperConnect;
this.groupId = groupId;
this.topic = topic;
this.vkoConsumer = vkoConsumer;
log.info("kafka vkoConsumer 创建完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect);
receive();
}
private void receive(){
Properties props = new Properties();
props.put("zookeeper.connect", zookeeperConnect);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "14000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
ConsumerConfig conf = new ConsumerConfig(props);
ConsumerConnector cc = Consumer.createJavaConsumerConnector(conf);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
// 目前每个topic都是2个分区
topicCountMap.put(topic,2);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
for (final KafkaStream<byte[], byte[]> stream : streams) {
new Thread(){
public void run(){
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while(it.hasNext()){
String msg = new String(it.next().message());
try{
vkoConsumer.dealMsg(msg);
}catch(Exception e){
log.error("kafka vkoConsumer topic:{} 收到消息:{} 消费异常 xxxxxxxxxxxxxxxxxx", topic, msg,e);
}
log.info("kafka vkoConsumer topic:{} 收到消息:{}", topic, msg);
}
}
}.start();
log.info("kafka vkoConsumer 启动完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect);
}
log.info("kafka vkoConsumer 准备接收消息:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect);
}
}
package cn.vko.index;
public interface VkoConsumer {
public void dealMsg(String strings);
}
//实际项目中写的consumer
@Service
public class SellStatConsumer implements VkoConsumer{
private Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private ISellDetailService sellDetailService;
@Autowired
private ISellerService sellerService;
@Autowired(required=false)
private IIpServiceRemote ipServiceRemote;
@Autowired(required=false)
private IPhoneCityServiceRemote phoneCityServiceRemote;
@Override
public void dealMsg(String rowData) {
if (!new JsonValidator().validate(rowData)) {
logger.error("json error ...... : {}", rowData);
return;
}
logger.info("========start kafka consumer=============="+rowData);
JSONObject json = JSONObject.fromObject(rowData);
PayInfoForm form = (PayInfoForm)JSONObject.toBean(json, PayInfoForm.class);
//do something
}
}
标签:
原文地址:http://www.cnblogs.com/lilixin/p/5775877.html