nsq基础知识与简单demo

NSQ

A realtime distributed messaging platform

优势

  • 基于golang
  • 分布式
  • 水平扩展
  • 自带UI,操作友好
  • 多语言client

组件

组件 功能
nsqd 接收、排队和向客户端传递消息的守护进程
nsqlookupd 管理拓扑信息的守护进程
nsqadmin Web UI,用于实时查看聚合的集群统计信息并执行各种管理任务
utilities 常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

nsqd

它可以独立运行,但通常与nsqlookupd 实例一起配置在集群中(在这种情况下,它将宣布主题和频道以供发现)。

它侦听两个 TCP 端口,一个用于客户端,另一个用于 HTTP API。它可以选择在第三个端口上侦听 HTTPS。

nsqlookupd

有两个接口:nsqd用于广播的TCP 接口和用于客户端执行发现和管理操作的 HTTP 接口。

MAC安装

brew install nsq
nsqlookupd
nsqd –lookupd-tcp-address=127.0.0.1:4160 –broadcast-address=127.0.0.1
nsqadmin –lookupd-http-address=127.0.0.1:4161

go-nsq

// producer
package nsq

import (
	"context"
	"fmt"
	"time"

	"github.com/nsqio/go-nsq"
	"github.com/spf13/cast"
)

// 主函数
func Send(ctx context.Context, cancel context.CancelFunc, topic string) {
	defer cancel()
	str := "127.0.0.1:4150"
	fmt.Println("address: ", str)
	producer, err := nsq.NewProducer(str, nsq.NewConfig())
	if err != nil {
		panic(err)
	}

	producer.SetLogger(nil, 0)

	for i := 0; i < 5; i++ {
		msg := "puresai, " + cast.ToString(i)
		fmt.Println("publish", msg, producer.Publish(topic, []byte(msg)))
		time.Sleep(time.Second * 1)
	}

	<-ctx.Done()
	producer.Stop()
	fmt.Println("producer exit")
}
// consumer
package nsq

import (
	"context"
	"fmt"
	"time"

	"github.com/nsqio/go-nsq"
)

// 消费者
type Consumer struct{}

// 主函数
func Receive(ctx context.Context, cancel context.CancelFunc, topic string) {
	defer cancel()
	// address := "127.0.0.1:4161"

	channel := topic + "-channel"
	cfg := nsq.NewConfig()
	cfg.LookupdPollInterval = time.Second * 2
	c, err := nsq.NewConsumer(topic, channel, cfg)
	if err != nil {
		panic(err)
	}
	c.SetLogger(nil, 0) //屏蔽系统日志
	c.AddConcurrentHandlers(&Consumer{}, 3)

	//建立NSQLookupd连接
	// if err := c.ConnectToNSQLookupd(address); err != nil {
	// 	panic(err)
	// }

	//建立多个nsqd连接
	if err := c.ConnectToNSQDs([]string{"127.0.0.1:4150"}); err != nil {
		panic(err)
	}
	<-ctx.Done()
	c.Stop()
	fmt.Println("consumer exit")
}

// 处理消息
func (*Consumer) HandleMessage(msg *nsq.Message) error {
	fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
	return nil
}
// nsq_test
package nsq

import (
	"context"
	"os"
	"os/signal"
	"syscall"
	"testing"
	"time"
)

func TestReceive(t *testing.T) {
	topic := "sai0556"
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	go Send(ctx, cancel, topic)
	go Receive(ctx, cancel, topic)

	sig := make(chan os.Signal)
	signal.Notify(sig, syscall.SIGTERM, syscall.SIGINT)
	t.Log("开始监听")

	select {
	case <-ctx.Done():
		t.Log("ctx done")
		return
	case <-sig:
		t.Log("signal exit...")
		cancel()
		time.Sleep(2 * time.Second)
		return
	}

	// send(topic)

}

测试走一波,

nsq

对于NSQ,自己也是刚刚使用,给我的感觉是相当好上手,之前有用过RabbitMQ,nsq相比来说更简单,可能是go-client相对好用一些吧。
后续有其他值得分享的点再继续补充。如有需要交流,可联系我email/qq。

参考


nsq基础知识与简单demo
https://blog.puresai.com/2021/07/31/360/
作者
puresai
许可协议