知方号

知方号

NSQ 消息队列使用实战<消息队列订阅消息怎么删除>

网上看了好多,都是抄个官网 README,很多重要的东西不说清楚。只好自己研究了一下。

NSQ 的全家桶介绍

nsqd:守护进程,客户端通信。默认端口 4150(TCP) 4151(HTTP)

nsqlookupd:相当于一个路由器。客户端可以经由它发现生产者、nsqd 广播的话题。**一个 nsqlookupd 能够管理一群 nsqd。**默认端口::4160(TCP),:4161(HTTP)

nsqadmin:在线面板,能够通过浏览器直接访问。默认端口 :4171

从命令行启动

可以直接下载二进制文件。开三个终端,分别执行:

nsqlookupdnsqd --lookupd-tcp-address=127.0.0.1:4160 --broadcast-address=127.0.0.1nsqadmin --lookupd-http-address=127.0.0.1:4161go-nsq 的使用

我封装了一个包:

1package mq 2 3import ( 4"encoding/json" 5"fmt" 6"time" 7 8"github.com/nsqio/go-nsq" 9"go.uber.org/zap"10)1112type MessageQueueConfig struct {13NsqAddr string14NsqLookupdAddr string15SupportedTopics []string16}1718type MessageQueue struct {19config MessageQueueConfig20producer *nsq.Producer21consumers map[string]*nsq.Consumer22}2324func NewMessageQueue(config MessageQueueConfig) (mq *MessageQueue, err error) {25zap.L().Debug("New message queue")26producer, err := initProducer(config.NsqAddr)27if err != nil {28return nil, err29}30consumers := make(map[string]*nsq.Consumer)31for _, topic := range config.SupportedTopics {32nsq.Register(topic,"default")33consumers[topic], err = initConsumer(topic, "default", config.NsqAddr)34if err != nil {35return36}37}38return &MessageQueue{39config: config,40producer: producer,41consumers: consumers,42}, nil43}4445func (mq *MessageQueue) Run() {46for name, c := range mq.consumers {47zap.L().Info("Run consumer for " + name)48// c.ConnectToNSQLookupd(mq.config.NsqLookupdAddr)49c.ConnectToNSQD(mq.config.NsqAddr)50}51}5253func initProducer(addr string) (producer *nsq.Producer, err error) {54zap.L().Debug("initProducer to " + addr)55config := nsq.NewConfig()56producer, err = nsq.NewProducer(addr, config)57return58}5960func initConsumer(topic string, channel string, address string) (c *nsq.Consumer, err error) {61zap.L().Debug("initConsumer to " + topic + "/" + channel)62config := nsq.NewConfig()63config.LookupdPollInterval = 15 * time.Second64c, err = nsq.NewConsumer(topic, channel, config)65return66}6768func (mq *MessageQueue) Pub(name string, data interface{}) (err error) {69body, err := json.Marshal(data)70if err != nil {71return72}73zap.L().Info("Pub " + name + " to mq. data = " + string(body))74return mq.producer.Publish(name, body)75}7677type Messagehandler func(v []byte)7879func (mq *MessageQueue) Sub(name string, handler Messagehandler) (err error) {80zap.L().Info("Subscribe " + name)81v, ok := mq.consumers[name]82if !ok {83err = fmt.Errorf("No such topic: " + name)84return85}86v.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {87handler(message.Body)88return nil89}))90return91}

使用示例:

1m, err := mq.NewMessageQueue(mq.MessageQueueConfig{ 2NsqAddr: "127.0.0.1:4150", 3NsqLookupdAddr: "127.0.0.1:4161", 4SupportedTopics: []string{"hello"}, 5}) 6 7if err != nil { 8zap.L().Fatal("Message queue error: " + err.Error()) 9}1011m.Sub("hello", func(resp []byte) {12zap.L().Info("S1 Got: " + string(resp))13})14m.Sub("hello", func(resp []byte) {15zap.L().Info("S2 Got: " + string(resp))16})17m.Run()18err = m.Pub("hello", "world")19if err != nil {20zap.L().Fatal("Message queue error: " + err.Error())21}22err = m.Pub("hello", "tom")23if err != nil {24zap.L().Fatal("Message queue error: " + err.Error())25}2627sigChan := make(chan os.Signal, 1)28signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)29

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至lizi9903@foxmail.com举报,一经查实,本站将立刻删除。