码迷,mamicode.com
首页 > 其他好文 > 详细

Golang之发送消息至kafka

时间:2018-01-28 18:15:24      阅读:1426      评论:0      收藏:0      [点我收藏+]

标签:配置   sam   serve   console   require   cli   服务器   bsp   color   

windows下安装zookeeper

1、安装JAVA-JDK,从oracle下载最新的SDK安装(我用的是1.8的) 
2、安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/ 
3、重命名conf/zoo_sample.cfg 为conf/zoo.cfg 
4、编辑 conf/zoo.cfg,修改dataDir=D:\zookeeper-3.3.6\data\ 
4、运行bin/zkServer.cmd

启动结果如下:

技术分享图片

安装kafka

1、打开链接:http://kafka.apache.org/downloads.html下载kafka2.1.2 

2、打开config目录下的server.properties, 修改log.dirs为D:\kafka_logs,

3、修改advertised.host.name=服务器ip

4、启动kafka ./bin/windows/kafka-server-start.bat ./config/server.preperties

kafka链接zookeeper

kafka也提供了一个命令行消费者,接受消息并打印到标准输出。

bin/kafka-console-consumer.bat --zookeeper 127.0.0.1:2181 --topic nginx_log

golang写入kafka


package main

import (
"fmt"

"github.com/Shopify/sarama"
"time"
)

//消息写入kafka
func main() {
//初始化配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Return.Successes = true
//生产者
client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
if err != nil {
fmt.Println("producer close,err:", err)
return
}

defer client.Close()
var n int=0

for n<20{
n++
//创建消息
msg := &sarama.ProducerMessage{}
msg.Topic = "nginx_log"
msg.Value = sarama.StringEncoder("this is a good test,hello chaoge!!")
//发送消息
pid, offset, err := client.SendMessage(msg)
if err != nil {
fmt.Println("send message failed,", err)
return
}
fmt.Printf("pid:%v offset:%v\n,", pid, offset)
time.Sleep(10 * time.Millisecond)

}

}
 

goland运行结果:

技术分享图片

kafka收到的数据:

技术分享图片

 

Golang之发送消息至kafka

标签:配置   sam   serve   console   require   cli   服务器   bsp   color   

原文地址:https://www.cnblogs.com/pyyu/p/8371649.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!