RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

    • Endpoint Overview
    • Quick Start
    • Router
    • DSL
    • API
    • Options
    • Endpoints

      • Rest Endpoint
      • Websocket Endpoint
      • MQTT Endpoint
      • Schedule Endpoint
      • Net Endpoint
      • Kafka Endpoint
      • Nats Endpoint
      • Redis Sub Endpoint
      • Redis Stream Endpoint
      • Rabbitmq Endpoint
      • MYSQL CDC Endpoint
      • OPC_UA Endpoint
      • gRPC Stream Endpoint
      • Beanstalkd Endpoint
      • Wukongim Endpoint
      • Extend Endpoint
      • NSQ Endpoint
        • Type
        • Startup Configuration
        • Configuration Example
        • Router Configuration
        • Response
        • Message Format
        • Working Principle
        • Examples
      • Pulsar Endpoint
  • Support

  • StreamSQL

目录

NSQ Endpoint

NSQ Endpoint is used to create and start NSQ subscription services. It can subscribe to different topic data and route them to different rule chains for processing.

TIP

This component is an extension component that requires importing an additional extension library: rulego-components (opens new window)

# Type

endpoint/nsq

# Startup Configuration

Field Type Required Description Default Value
server string Yes NSQ server address, supports multiple formats:
1. Single nsqd: "127.0.0.1:4150"
2. Multiple nsqd: "127.0.0.1:4150,127.0.0.1:4151"
3. Lookupd address: "http://127.0.0.1:4161,http://127.0.0.1:4162"
127.0.0.1:4150
channel string No Default channel name, used if not specified when adding router default
authToken string No NSQ authentication token -
certFile string No TLS certificate file path -
certKeyFile string No TLS private key file path -

# Configuration Example

{
  "server": "127.0.0.1:4150",
  "channel": "my-channel",
  "authToken": "your-auth-token",
  "certFile": "/path/to/cert.pem",
  "certKeyFile": "/path/to/key.pem"
}
1
2
3
4
5
6
7

# Router Configuration

When adding routes, you can specify the channel (consumer group) through parameters. Channel priority is: AddRouter parameter > Global configuration > Default value ("default"):

// Add route using global configuration channel, or "default" if not configured
routerId, err := nsqEndpoint.AddRouter(router)

// Add route specifying channel as "my-channel", takes priority over global configuration
routerId, err := nsqEndpoint.AddRouter(router, "my-channel")
1
2
3
4
5

# Response

Before exchange.Out.SetBody response, you need to specify the responseTopic parameter through exchange.Out.Headers() or exchange.Out.Msg.Metadata, and the component will send data to the specified topic:

exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "device.msg.response")
// or
exchange.Out.Headers().Add("responseTopic", "device.msg.response")

exchange.Out.SetBody([]byte("ok"))
1
2
3
4
5

Response parameter configuration:

Field Type Required Description Default Value
responseTopic string Yes Response topic -

# Message Format

Received NSQ messages are converted to RuleMsg, containing the following metadata:

  • messageId: NSQ message ID
  • attempts: Message retry count
  • timestamp: Message timestamp

# Working Principle

  1. Component starts by connecting to NSQ server based on configuration
  2. Supports connecting to single or multiple nsqd instances, also supports discovering nsqd through lookupd
  3. Creates corresponding consumers for each route, supports multiple routes sharing the same consumer (same topic+channel)
  4. Upon receiving messages, converts to RuleMsg and routes to corresponding rule chains for processing
  5. Supports sending processing results through response topics
  6. Automatically acknowledges message processing completion

# Examples

Here are example codes using endpoints:

  • RestEndpoint (opens new window)
  • WebsocketEndpoint (opens new window)
  • MqttEndpoint (opens new window)
  • ScheduleEndpoint (opens new window)
  • NetEndpoint (opens new window)
  • NSQEndpoint (opens new window) (Extension component library)
Edit this page on GitHub (opens new window)
Last Updated: 2025/08/03, 15:15:53
Extend Endpoint
Pulsar Endpoint

← Extend Endpoint Pulsar Endpoint→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式