Nsq搭建与使用

1. 下载

下载之后解压,并将其bin路径添加到环境变量当中

github地址: https://github.com/nsqio/nsq/releases

文档: https://nsq.io/overview/quick_start.html

2. 使用说明

2.1 启动nsqlookupd

1
nsqlookupd

它会监听两个端口: http: 4161 客户端用它来发现和管理。 tcp: 4160 nsqd 用它来广播

可选参数:

  • http-address="127.0.0.1:4161" : 监听 HTTP 客户端地址
  • inactive-producer-timeout=5m0s: 从上次 ping 之后,生产者驻留在活跃列表中的时长
  • tcp-address="0.0.0.0:4160": TCP 客户端监听的地址
  • broadcast-address: 这个 lookupd 节点的外部地址, (默认主机名)
  • tombstone-lifetime=45s: 生产者保持 tombstoned 的时长
  • verbose=false: 允许输出日志
  • version=false: 打印版本信息

2.2 启动nsqd

1
nsqd --lookupd-tcp-address=127.0.0.1:4160

它是一个守护进程,负责接收消息,传递消息给客户端,排队。 会监听两个端口: http: 4151, tcp: 4150

3.3 启动nsqadmin

1
nsqadmin --lookupd-http-address=127.0.0.1:4161

它是一个Web页面,负责管理我们的消息队列, 它后面的地址即是我们在 nsqlookupd 里面http-address参数配置的地址,nsqadmin的监听地址为4171,通过127.0.0.1:4171地址可打开NSQ的Web管理页面

3. 基本使用

Channel是消费者订阅特定Topic的一种抽象。对于发往Topic的消息,nsqd向该Topic下的所有Channel投递消息,而同一个Channel只投递一次,Channel下如果存在多个消费者,则随机选择一个消费者做投递。这种投递方式可以被用作消费者负载均衡。和Topic一样,Channel同样有永久和临时之分,永久的Channel只能通过显式删除销毁,临时的Channel在最后一个消费者断开连接的时候被销毁

  • Topic 就是一个通道,我们可以往这个Topic里面发送消息
  • Channel起到一个负载均衡的作用,我们可以在一个Topic中建立多个Channel来共同消费这个Topic里面的消息。


    我们建立了一个叫 test-dev的Topic,Channel为default

往通道里面发送消息

1
curl -d 'hello world' 'http://127.0.0.1:4151/pub?topic=test-dev'

从通道中消费消息,这里我们要指定从哪个Channel里消费,

1
nsq_to_file --topic=test-dev -channel=default --output-dir=log --lookupd-http-address=127.0.0.1:4161

此时就会在当前目前下生成一个 log 文件夹,里面存放的就是我们这个Channel里的消息

4. 封装代码

安装第三方库

1
go get -u github.com/youzan/go-nsq
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package nsq

import (
"flag"
"fmt"
"sync"
"time"

"github.com/youzan/go-nsq"
)

func init() {
flag.StringVar(&env,"env","dev","the nsq environment")
}

var (
env string
pubMu = &sync.RWMutex{}
pubMgrs = make(map[string]*nsq.TopicProducerMgr)
consumers = &sync.Map{}
)

type Config struct {
config *nsq.Config
lookAddr string
}

var DefaultConfig = func() *Config {
cfg := nsq.NewConfig()
return &Config{
config: cfg,
lookAddr: "127.0.0.1:4161",
}
}()

func getPubMgr(topic string)(*nsq.TopicProducerMgr,error){
pubMu.RLock()
if pubMgr,ok := pubMgrs[topic];ok {
pubMu.RUnlock()
return pubMgr,nil
}
pubMu.RUnlock()

pubMu.Lock()
defer pubMu.Unlock()

pubMgr,err := nsq.NewTopicProducerMgr([]string{topic},DefaultConfig.config)
if err != nil {
return nil,err
}
err = pubMgr.ConnectToNSQLookupd(DefaultConfig.lookAddr)
if err != nil {
return nil,err
}
pubMgrs[topic] = pubMgr

return pubMgr,nil
}

func wrapTopic(topic string) string {
return fmt.Sprintf("%s-%s",topic,env)
}

func Publish(topic string, data []byte) error {
topic = wrapTopic(topic)
pubMgr, err := getPubMgr(topic)
if err != nil {
return err
}
return pubMgr.Publish(topic,data)
}

func Consume(topic,channel string, handlerFunc nsq.HandlerFunc,concurrency int) error {
topic = wrapTopic(topic)
consumer, err := nsq.NewConsumer(topic, channel, DefaultConfig.config)
if err != nil {
return err
}
consumer.AddConcurrentHandlers(handlerFunc,concurrency)
// set the consumer to map for close
key := topic+":"+channel
consumers.Store(key,consumer)

return consumer.ConnectToNSQLookupd(DefaultConfig.lookAddr)
}

func Close() {
// 记录关闭过得mgr,每个pubMgr仅可被关闭一次
closedPubMgrs := &sync.Map{}
pubMu.RLock()
for _,pubMgr := range pubMgrs{
if _,ok := closedPubMgrs.Load(pubMgr);ok {
continue
}
closedPubMgrs.Store(pubMgr, struct{}{})
pubMgr.Stop()
}
pubMu.RUnlock()

// close the consumer
consumers.Range(func(key, value interface{}) bool {
if consumer,ok := value.(*nsq.Consumer);ok{
consumer.Stop()
select {
case <-consumer.StopChan:
case <-time.After(time.Second * 60):
//等待一分钟让其关闭handler
}
}
consumers.Delete(key)
return true
})
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package nsq

import (
"github.com/youzan/go-nsq"
"testing"
)
func TestPublish(t *testing.T) {
err := Publish("test", []byte("Hello Pibigstar"))
if err != nil {
t.Fatal(err)
}
c := make(chan struct{})

Consume("test", "default", func(msg *nsq.Message) error {
t.Log(string(msg.Body))
select {
case <-c:
default:
close(c)
}
return nil
}, 5)
<-c
}

如果你不想使用有赞的第三方库,你可以使用下面这个:

1
go get -u github.com/nsqio/go-nsq

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"github.com/nsqio/go-nsq"
"time"
)

var (
addr = "127.0.0.1:4150"
defaultConfig = nsq.NewConfig()
)

func main() {
Producer("test-dev",[]byte("Hello Pibigstar"))
Consumer("test-dev","default",HandleMessage)
time.Sleep(time.Second * 3)
}

func HandleMessage(msg *nsq.Message) error {
fmt.Println(string(msg.Body))
return nil
}

// nsq发布消息
func Producer(topic string,data []byte) error {
// 新建生产者
p, err := nsq.NewProducer(addr, defaultConfig)
if err != nil {
panic(err)
}
// 发布消息
return p.Publish(topic, data)
}

// 消费消息
func Consumer(topic,channel string,handlerFunc nsq.HandlerFunc) error {
//新建一个消费者
c, err := nsq.NewConsumer(topic, channel, defaultConfig)
if err != nil {
panic(err)
}
//添加消息处理
c.AddHandler(handlerFunc)
//建立连接
return c.ConnectToNSQD(addr)
}
-------------本文结束感谢您的阅读-------------