标签:beanstalkd go golang 消息队列 messagequeue
最近需要引入一种新的消息队列,这个队列最好有专业、简单、消息不丢失等特性,但又不会引入过多的复杂性,Beanstalkd 主页在这: http://kr.github.io/beanstalkd
写了个调用例子如下.
/* xcl (2015-8-15) 多TubeName 多消费者 */ package main import ( "fmt" "github.com/kr/beanstalk" "runtime" "strings" "time" ) var ( TubeName1 string = "channel1" TubeName2 string = "channel2" ) func Producer(fname, tubeName string) { if fname == "" || tubeName == "" { return } c, err := beanstalk.Dial("tcp", "127.0.0.1:11300") if err != nil { panic(err) } defer c.Close() c.Tube.Name = tubeName c.TubeSet.Name[tubeName] = true fmt.Println(fname, " [Producer] tubeName:", tubeName, " c.Tube.Name:", c.Tube.Name) for i := 0; i < 5; i++ { msg := fmt.Sprintf("for %s %d", tubeName, i) c.Put([]byte(msg), 30, 0, 120*time.Second) fmt.Println(fname, " [Producer] beanstalk put body:", msg) //time.Sleep(1 * time.Second) } c.Close() fmt.Println("Producer() end.") } func Consumer(fname, tubeName string) { if fname == "" || tubeName == "" { return } c, err := beanstalk.Dial("tcp", "127.0.0.1:11300") if err != nil { panic(err) } defer c.Close() c.Tube.Name = tubeName c.TubeSet.Name[tubeName] = true fmt.Println(fname, " [Consumer] tubeName:", tubeName, " c.Tube.Name:", c.Tube.Name) substr := "timeout" for { fmt.Println(fname, " [Consumer]///////////////////////// ") //从队列中取出 id, body, err := c.Reserve(1 * time.Second) if err != nil { if !strings.Contains(err.Error(), substr) { fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] err:", err, " id:", id) } continue } fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] job:", id, " body:", string(body)) //从队列中清掉 err = c.Delete(id) if err != nil { fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] Delete err:", err, " id:", id) } else { fmt.Println(fname, " [Consumer] [", c.Tube.Name, "] Successfully deleted. id:", id) } fmt.Println(fname, " [Consumer]/////////////////////////") //time.Sleep(1 * time.Second) } fmt.Println("Consumer() end. ") } func main() { runtime.GOMAXPROCS(runtime.NumCPU()) go Producer("PA", TubeName1) go Producer("PB", TubeName2) go Consumer("CA", TubeName1) go Consumer("CB", TubeName2) time.Sleep(10 * time.Second) } /* 运行结果: XCLdeiMac:src xcl$ clear XCLdeiMac:src xcl$ go run testmq.go CB [Consumer] tubeName: channel2 c.Tube.Name: channel2 CA [Consumer] tubeName: channel1 c.Tube.Name: channel1 CB [Consumer]///////////////////////// CA [Consumer]///////////////////////// PB [Producer] tubeName: channel2 c.Tube.Name: channel2 PA [Producer] tubeName: channel1 c.Tube.Name: channel1 PB [Producer] beanstalk put body: for channel2 0 PA [Producer] beanstalk put body: for channel1 0 CA [Consumer] [ channel1 ] job: 47027 body: for channel1 0 CB [Consumer] [ channel2 ] job: 47026 body: for channel2 0 PB [Producer] beanstalk put body: for channel2 1 PA [Producer] beanstalk put body: for channel1 1 CB [Consumer] [ channel2 ] Successfully deleted. id: 47026 CB [Consumer]///////////////////////// CB [Consumer]///////////////////////// CA [Consumer] [ channel1 ] Successfully deleted. id: 47027 CA [Consumer]///////////////////////// CA [Consumer]///////////////////////// CA [Consumer] [ channel1 ] job: 47028 body: for channel1 1 PA [Producer] beanstalk put body: for channel1 2 CB [Consumer] [ channel2 ] job: 47029 body: for channel2 1 PB [Producer] beanstalk put body: for channel2 2 PA [Producer] beanstalk put body: for channel1 3 CA [Consumer] [ channel1 ] Successfully deleted. id: 47028 CA [Consumer]///////////////////////// CA [Consumer]///////////////////////// CB [Consumer] [ channel2 ] Successfully deleted. id: 47029 PB [Producer] beanstalk put body: for channel2 3 CB [Consumer]///////////////////////// CB [Consumer]///////////////////////// PB [Producer] beanstalk put body: for channel2 4 CB [Consumer] [ channel2 ] job: 47030 body: for channel2 2 CA [Consumer] [ channel1 ] job: 47031 body: for channel1 2 PA [Producer] beanstalk put body: for channel1 4 Producer() end. Producer() end. CA [Consumer] [ channel1 ] Successfully deleted. id: 47031 CA [Consumer]///////////////////////// CA [Consumer]///////////////////////// CB [Consumer] [ channel2 ] Successfully deleted. id: 47030 CB [Consumer]///////////////////////// CB [Consumer]///////////////////////// CB [Consumer] [ channel2 ] job: 47033 body: for channel2 3 CA [Consumer] [ channel1 ] job: 47032 body: for channel1 3 CB [Consumer] [ channel2 ] Successfully deleted. id: 47033 CA [Consumer] [ channel1 ] Successfully deleted. id: 47032 CB [Consumer]///////////////////////// CB [Consumer]///////////////////////// CA [Consumer]///////////////////////// CA [Consumer]///////////////////////// CA [Consumer] [ channel1 ] job: 47034 body: for channel1 4 CB [Consumer] [ channel2 ] job: 47035 body: for channel2 4 CB [Consumer] [ channel2 ] Successfully deleted. id: 47035 CB [Consumer]///////////////////////// CA [Consumer] [ channel1 ] Successfully deleted. id: 47034 CB [Consumer]///////////////////////// XCLdeiMac:src xcl$ */可用beanstool来查看队列状态
也可以参考我写下面两段,来查。
ar, er := c.ListTubes() if er != nil { fmt.Println("[Example] er:", er) } else { for i, v := range ar { fmt.Println("[Example] ListTubes i:", i, " v:", v) c.Tube.Name = v id, body, err := c.Reserve(5 * time.Second) if err != nil { fmt.Println("[Example] err:", err, " name:", c.Tube.Name) continue } else { fmt.Println("[Example] job:", id) fmt.Println("[Example] body:", string(body)) } } } func tubeStatus(c *beanstalk.Conn) { fmt.Println("[tubeStatus]/////////////////////////") fmt.Println("Tube(", c.Tube.Name, ") Stats:") m, er := c.Tube.Stats() if er != nil { fmt.Println("[tubeStatus] err:", er) } else { for k, v := range m { fmt.Println(k, " : ", v) } } fmt.Println("[tubeStatus]/////////////////////////") }从测试看,Beanstalkd 足以满足我现在的需求了.
BLOG: http://blog.csdn.net/xcl168
版权声明:本文为博主原创文章,未经博主允许不得转载。
标签:beanstalkd go golang 消息队列 messagequeue
原文地址:http://blog.csdn.net/xcl168/article/details/47705725