NSQ(有赞版)

NSQ参数说明 请参阅 NSQ参数

####核心概念

  • Topic ——一个topic就是程序发布消息的一个逻辑键,当程序第一次发布消息时就会创建topic。

  • Channels ——channel组与消费者相关,是消费者之间的负载均衡,channel在某种意义上来说是一个“队列”。每当一个发布者发送一条消息到一个topic,消息会被复制到所有消费者连接的channel上,消费者通过这个特殊的channel读取消息,实际上,在消费者第一次订阅时就会创建channel。

  • Channel会将消息进行排列,如果没有消费者读取消息,消息首先会在内存中排队,当量太大时就会被保存到磁盘中。

  • Message s——消息构成了我们数据流的中坚力量,消费者可以选择结束消息,表明它们正在被正常处理,或者重新将他们排队待到后面再进行处理。每个消息包含传递尝试的次数,当消息传递超过一定的阀值次数时,我们应该放弃这些消息,或者作为额外消息进行处理。

  • NSQ在操作期间同样运行着两个程序:

    • Nsqd ——nsqd守护进程是NSQ的核心部分,它是一个单独的监听某个端口进来的消息的二进制程序。每个nsqd节点都独立运行,不共享任何状态。当一个节点启动时,它向一组nsqlookupd节点进行注册操作,并将保存在此节点上的topic和channel进行广播。

      客户端可以发布消息到nsqd守护进程上,或者从nsqd守护进程上读取消息。通常,消息发布者会向一个单一的local nsqd发布消息,消费者从连接了的一组nsqd节点的topic上远程读取消息。如果你不关心动态添加节点功能,你可以直接运行standalone模式。

    • Nsqlookupd ——nsqlookupd服务器像consul或etcd那样工作,只是它被设计得没有协调和强一致性能力。每个nsqlookupd都作为nsqd节点注册信息的短暂数据存储区。消费者连接这些节点去检测需要从哪个nsqd节点上读取消息。

    • NSQ 重要名词
      1、In-Flight
      客户端读到消息,正在处理的部分(可以理解成没有 NSQ 的 ACK)

      2、Finish
      正常流程下,处理完的逻辑,需要手动标记已经处理完了 msg.Finish()
      不标记的消息,过了一段时间后,会有重排机制,这种重排会导致整个消息的消费变慢。

      3、Requeue
      人工标记重排,该类重排属于正常业务逻辑范畴,不会拖慢消费速度。

####NSQ 组件

  • nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。

  • nsqlookupd 是守护进程负责管理拓扑信息。

    客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息。

    有两个接口:TCP 接口,nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。

  • nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。

Linux 下安装

  • 安装使用 nsq环境
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    1、获取docker 镜像
    docker pull nsqio/nsq

    2、运行 nsqlookupd
    docker run -d --name nsqlookupd -p 4160:4160 -p 4161:4161 nsqio/nsq /nsqlookupd

    - -http-address="0.0.0.0:4161": <addr>:<port> 监听 HTTP 客户端

    - -tcp-address="0.0.0.0:4160": TCP 客户端监听的 <addr>:<port>

    3 docker 运行 nsqd
    docker run --name nsqd -p 4150:4150 -p 4151:4151 nsqio/nsq /nsqd --broadcast-address=127.0.0.1 --lookupd-tcp-address=127.0.0.1:4160

    - -broadcast-address="": 通过 lookupd 注册的地址(默认名是 OS)

    - -lookupd-tcp-address=: 解析 TCP 地址名字 (可能会给多次)

    4 docker 运行 nsqadmin
    docker run --name nsqadmin -p 4171:4171 nsqio/nsq /nsqadmin --lookupd-http-address=127.0.0.1:4161

    - -graphite-url="": URL to graphite HTTP 地址

    - -http-address="0.0.0.0:4171": <addr>:<port> HTTP clients 监听的地址和端口

    - -lookupd-http-address=[]: lookupd HTTP 地址 (可能会提供多次)

    - -notification-http-endpoint="": HTTP 端点 (完全限定) ,管理动作将会发送到

    - -nsqd-http-address=[]: nsqd HTTP 地址 (可能会提供多次)

一些认识

  • 发布者:消息发布,只能面向具体的nsqd服务进行

    • 一个topic的发布者只对应一个具体的NSQD,但可以多个发布者同时向一个NSQD发送消息,他们是N:1的关系。
      NSQD与topic是1:N的关系。
  • 消费者:

    • consumer要接收消息,是要连接到具体的nsqd服务的。通常我们能通过封装好的方法,基于lookupd服务来获取所有的nsqd服务地址并连接。

    • 一个消费者订阅的topic分布在哪些nsqd服务中,则会直接连接。nsqd之间是绝对不会互传topic的具体数据的。下图描绘了consumer与nsqd的关系:

    • 当多个nsqd服务都有相同的topic的时候,consumer要修改默认设置config.MaxInFlight才能连接。

    • consumer与topic没有直接联系,而是通过具体的channel接受数据。如果consumer退出,channel不会自动删除。 如果不再需要,需要通过http端口删除channel,否则很可能会导致磁盘空间不足。

TEST

1
2

curl -d 'Hello Linux localhost Msg' 'http://127.0.0.1:4151/pub?topic=test'