首页
Web开发
Windows程序
编程语言
数据库
移动开发
系统相关
微信
其他好文
会员
首页
>
其他好文
> 详细
漫游kafka实战篇之搭建Kafka开发环境
时间:
2015-12-17 12:18:23
阅读:
238
评论:
0
收藏:
0
[点我收藏+]
标签:
上篇文章中我们搭建了kafka的服务器,并可以使用Kafka的命令行工具创建topic,发送和接收消息。下面我们来搭建kafka的开发环境。
添加依赖
搭建开发环境需要引入kafka的jar包,一种方式是将Kafka安装包中lib下的jar包加入到项目的classpath中,这种比较简单了。不过我们使用另一种更加流行的方式:使用maven管理jar包依赖。
创建好maven项目后,在pom.xml中添加以下依赖:
[html]
view plain
copy
<
dependency
>
<
groupId
>org.apache.kafka
</
groupId
>
<
artifactId
>kafka_2.10
</
artifactId
>
<
version
>0.8.2.2
</
version
>
</
dependency
>
添加依赖后如果有两个jar包的依赖找不到。点击
这里
下载这两个jar包,解压后你有两种选择,第一种是使用mvn的install命令将jar包安装到本地仓库,另一种是直接将解压后的文件夹拷贝到mvn本地仓库的com文件夹下,比如我的本地仓库是d:\mvn,完成后我的目录结构是这样的:
配置程序
更新更全的API编程实例点这里:
http://blog.csdn.net/honglei915/article/details/37697655
首先是一个充当配置文件作用的接口,配置了Kafka的各种连接参数:
[java]
view plain
copy
package com.sohu.kafkademon;
public
interface KafkaProperties
{
final
static String zkConnect =
"10.22.10.139:2181";
final
static String groupId =
"group1";
final
static String topic =
"topic1";
final
static String kafkaServerURL =
"10.22.10.139";
final
static
int kafkaServerPort =
9092;
final
static
int kafkaProducerBufferSize =
64 *
1024;
final
static
int connectionTimeOut =
20000;
final
static
int reconnectInterval =
10000;
final
static String topic2 =
"topic2";
final
static String topic3 =
"topic3";
final
static String clientId =
"SimpleConsumerDemoClient";
}
producer
[java]
view plain
copy
package com.sohu.kafkademon;
import java.util.Properties;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* @author leicui bourne_cui@163.com
*/
public
class KafkaProducer
extends Thread
{
private
final kafka.javaapi.producer.Producer<Integer, String> producer;
private
final String topic;
private
final Properties props =
new Properties();
public KafkaProducer(String topic)
{
props.put(
"serializer.class",
"kafka.serializer.StringEncoder");
props.put(
"metadata.broker.list",
"10.22.10.139:9092");
producer =
new kafka.javaapi.producer.Producer<Integer, String>(
new ProducerConfig(props));
this.topic = topic;
}
@Override
public
void run() {
int messageNo =
1;
while (
true)
{
String messageStr =
new String(
"Message_" + messageNo);
System.out.println(
"Send:" + messageStr);
producer.send(
new KeyedMessage<Integer, String>(topic, messageStr));
messageNo++;
try {
sleep(
3000);
}
catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
consumer
[java]
view plain
copy
package com.sohu.kafkademon;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* @author leicui bourne_cui@163.com
*/
public
class KafkaConsumer
extends Thread
{
private
final ConsumerConnector consumer;
private
final String topic;
public KafkaConsumer(String topic)
{
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
this.topic = topic;
}
private
static ConsumerConfig createConsumerConfig()
{
Properties props =
new Properties();
props.put(
"zookeeper.connect", KafkaProperties.zkConnect);
props.put(
"group.id", KafkaProperties.groupId);
props.put(
"zookeeper.session.timeout.ms",
"40000");
props.put(
"zookeeper.sync.time.ms",
"200");
props.put(
"auto.commit.interval.ms",
"1000");
return
new ConsumerConfig(props);
}
@Override
public
void run() {
Map<String, Integer> topicCountMap =
new HashMap<String, Integer>();
topicCountMap.put(topic,
new Integer(
1));
Map<String, List<KafkaStream<
byte[],
byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<
byte[],
byte[]> stream = consumerMap.get(topic).get(
0);
ConsumerIterator<
byte[],
byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println(
"receive:" +
new String(it.next().message()));
try {
sleep(
3000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
简单的发送接收
运行下面这个程序,就可以进行简单的发送接收消息了:
[java]
view plain
copy
package com.sohu.kafkademon;
/**
* @author leicui bourne_cui@163.com
*/
public
class KafkaConsumerProducerDemo
{
public
static
void main(String[] args)
{
KafkaProducer producerThread =
new KafkaProducer(KafkaProperties.topic);
producerThread.start();
KafkaConsumer consumerThread =
new KafkaConsumer(KafkaProperties.topic);
consumerThread.start();
}
}
高级别的consumer
下面是比较负载的发送接收的程序:
[java]
view plain
copy
package com.sohu.kafkademon;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
* @author leicui bourne_cui@163.com
*/
public
class KafkaConsumer
extends Thread
{
private
final ConsumerConnector consumer;
private
final String topic;
public KafkaConsumer(String topic)
{
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig());
this.topic = topic;
}
private
static ConsumerConfig createConsumerConfig()
{
Properties props =
new Properties();
props.put(
"zookeeper.connect", KafkaProperties.zkConnect);
props.put(
"group.id", KafkaProperties.groupId);
props.put(
"zookeeper.session.timeout.ms",
"40000");
props.put(
"zookeeper.sync.time.ms",
"200");
props.put(
"auto.commit.interval.ms",
"1000");
return
new ConsumerConfig(props);
}
@Override
public
void run() {
Map<String, Integer> topicCountMap =
new HashMap<String, Integer>();
topicCountMap.put(topic,
new Integer(
1));
Map<String, List<KafkaStream<
byte[],
byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<
byte[],
byte[]> stream = consumerMap.get(topic).get(
0);
ConsumerIterator<
byte[],
byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println(
"receive:" +
new String(it.next().message()));
try {
sleep(
3000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
漫游kafka实战篇之搭建Kafka开发环境
标签:
原文地址:http://www.cnblogs.com/shijiaoyun/p/5053321.html
踩
(
0
)
赞
(
0
)
举报
评论
一句话评论(
0
)
登录后才能评论!
分享档案
更多>
2021年07月29日 (22)
2021年07月28日 (40)
2021年07月27日 (32)
2021年07月26日 (79)
2021年07月23日 (29)
2021年07月22日 (30)
2021年07月21日 (42)
2021年07月20日 (16)
2021年07月19日 (90)
2021年07月16日 (35)
周排行
更多
分布式事务
2021-07-29
OpenStack云平台命令行登录账户
2021-07-29
getLastRowNum()与getLastCellNum()/getPhysicalNumberOfRows()与getPhysicalNumberOfCells()
2021-07-29
【K8s概念】CSI 卷克隆
2021-07-29
vue3.0使用ant-design-vue进行按需加载原来这么简单
2021-07-29
stack栈
2021-07-29
抽奖动画 - 大转盘抽奖
2021-07-29
PPT写作技巧
2021-07-29
003-核心技术-IO模型-NIO-基于NIO群聊示例
2021-07-29
Bootstrap组件2
2021-07-29
友情链接
兰亭集智
国之画
百度统计
站长统计
阿里云
chrome插件
新版天听网
关于我们
-
联系我们
-
留言反馈
© 2014
mamicode.com
版权所有 联系我们:gaon5@hotmail.com
迷上了代码!