RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

    • Endpoint Overview
    • Quick Start
    • Router
    • DSL
    • API
    • Options
    • Endpoints

      • Rest Endpoint
      • Websocket Endpoint
      • MQTT Endpoint
      • Schedule Endpoint
      • Net Endpoint
      • Kafka Endpoint
      • Nats Endpoint
      • Redis Sub Endpoint
      • Redis Stream Endpoint
      • Rabbitmq Endpoint
      • MYSQL CDC Endpoint
      • OPC_UA Endpoint
      • gRPC Stream Endpoint
      • Beanstalkd Endpoint
      • Wukongim Endpoint
      • Extend Endpoint
      • NSQ Endpoint
      • Pulsar Endpoint
        • Type
        • Startup Configuration
        • Router Configuration
        • Response
        • Message Format
        • Working Principle
        • Advanced Features
          • Message Property Propagation
          • Response Message Properties
        • Configuration Example
        • Examples
  • Support

  • StreamSQL

目录

Pulsar Endpoint

Pulsar Endpoint is used to create and start Pulsar subscription services. It can subscribe to different topic data and route them to different rule chains for processing.

TIP

This component is an extension component that requires importing an additional extension library: rulego-components (opens new window)

# Type

endpoint/pulsar

# Startup Configuration

Field Type Required Description Default Value
server string Yes Pulsar server address, format: pulsar://host:port pulsar://localhost:6650
subscriptionName string No Default subscription name, used if not specified when adding router default
subscriptionType string No Subscription type, supports: Exclusive, Shared, Failover, KeyShared (case insensitive) Shared
authToken string No Pulsar JWT authentication token -
certFile string No TLS certificate file path -
certKeyFile string No TLS private key file path -

# Router Configuration

When adding routes, you can specify the subscription (subscription name) through parameters:

// Add route using default subscription "default"
routerId, err := pulsarEndpoint.AddRouter(router)

// Add route specifying subscription as "my-subscription"
routerId, err := pulsarEndpoint.AddRouter(router, "my-subscription")
1
2
3
4
5

# Response

Before exchange.Out.SetBody response, you need to specify the responseTopic parameter through exchange.Out.Headers() or exchange.Out.Msg.Metadata, and the component will send data to the specified topic:

exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "persistent://public/default/device-response")
// or
exchange.Out.Headers().Add("responseTopic", "persistent://public/default/device-response")

exchange.Out.SetBody([]byte("ok"))
1
2
3
4
5

Response parameter configuration:

Field Type Required Description Default Value
responseTopic string Yes Response topic -

# Message Format

Received Pulsar messages are converted to RuleMsg, containing the following metadata:

  • topic: Message topic
  • messageId: Pulsar message ID
  • publishTime: Message publish time (RFC3339 format)
  • eventTime: Message event time (RFC3339 format)
  • key: Message key (if any)
  • Custom properties of the message are also added to metadata

# Working Principle

  1. Component starts by connecting to Pulsar cluster based on configuration
  2. Creates corresponding consumers for each route using Shared subscription type
  3. Upon receiving messages, converts to RuleMsg and routes to corresponding rule chains for processing
  4. Supports sending processing results through response topics, response messages include original message properties
  5. Automatically acknowledges message processing completion
  6. Supports graceful shutdown, ensuring message processing completion before closing connections

# Advanced Features

# Message Property Propagation

The component automatically adds all custom properties of Pulsar messages to RuleMsg metadata for use in rule chains.

# Response Message Properties

When sending response messages, all fields in response headers are sent as message properties:

// Set response message properties
exchange.Out.Headers().Set("correlationId", "12345")
exchange.Out.Headers().Set("messageType", "response")

// Set response topic and message body
exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "response-topic")
exchange.Out.SetBody([]byte("response data"))
1
2
3
4
5
6
7

# Configuration Example

{
  "server": "pulsar://localhost:6650",
  "subscriptionName": "my-subscription",
  "subscriptionType": "shared",
  "authToken": "your-jwt-token",
  "certFile": "/path/to/cert.pem",
  "certKeyFile": "/path/to/key.pem"
}
1
2
3
4
5
6
7
8

# Examples

Here are example codes using endpoints:

  • RestEndpoint (opens new window)
  • WebsocketEndpoint (opens new window)
  • MqttEndpoint (opens new window)
  • ScheduleEndpoint (opens new window)
  • NetEndpoint (opens new window)
  • PulsarEndpoint (opens new window) (Extension component library)
Edit this page on GitHub (opens new window)
Last Updated: 2025/08/03, 15:15:53
NSQ Endpoint
Support this project

← NSQ Endpoint Support this project→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式