初始版本
This commit is contained in:
223
mqttx/mqttx.go
Normal file
223
mqttx/mqttx.go
Normal file
@@ -0,0 +1,223 @@
|
||||
package mqttx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
)
|
||||
|
||||
type EndpointConfig struct {
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
ClientID string `json:"clientId"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
KeepAlive int `json:"keepAlive"`
|
||||
}
|
||||
type Config struct {
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
ClientID string `json:"clientId"`
|
||||
Topic []string `json:"topic"`
|
||||
Username string `json:"username"`
|
||||
Password string `json:"password"`
|
||||
Inner EndpointConfig `json:"inner"`
|
||||
}
|
||||
|
||||
type MessageHandler func(topic string, payload []byte)
|
||||
|
||||
type ConnectOption func(*connectOptions)
|
||||
|
||||
type connectOptions struct {
|
||||
clientIDTimestamp bool
|
||||
}
|
||||
|
||||
func WithClientIDTimestamp() ConnectOption {
|
||||
return func(opts *connectOptions) {
|
||||
opts.clientIDTimestamp = true
|
||||
}
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
client mqtt.Client
|
||||
logger *log.Logger
|
||||
mu sync.RWMutex
|
||||
subs map[string]subscription
|
||||
}
|
||||
|
||||
type subscription struct {
|
||||
qos byte
|
||||
handler MessageHandler
|
||||
}
|
||||
|
||||
func Connect(name string, cfg EndpointConfig, fallbackClientID string, logger *log.Logger, options ...ConnectOption) (*Client, error) {
|
||||
if cfg.Host == "" {
|
||||
return nil, fmt.Errorf("%s mqtt host is empty", name)
|
||||
}
|
||||
if logger == nil {
|
||||
logger = log.Default()
|
||||
}
|
||||
connectOpts := connectOptions{}
|
||||
for _, option := range options {
|
||||
if option != nil {
|
||||
option(&connectOpts)
|
||||
}
|
||||
}
|
||||
clientID := defaultString(cfg.ClientID, fallbackClientID)
|
||||
if connectOpts.clientIDTimestamp {
|
||||
clientID = fmt.Sprintf("%s_%d", clientID, time.Now().UnixMilli())
|
||||
}
|
||||
mc := &Client{logger: logger, subs: map[string]subscription{}}
|
||||
broker := cfg.Host
|
||||
if !strings.HasPrefix(broker, "tcp://") && !strings.HasPrefix(broker, "ssl://") && !strings.HasPrefix(broker, "ws://") {
|
||||
broker = fmt.Sprintf("tcp://%s:%d", cfg.Host, defaultInt(cfg.Port, 1883))
|
||||
}
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker(broker).
|
||||
SetClientID(clientID).
|
||||
SetAutoReconnect(true).
|
||||
SetConnectRetry(true).
|
||||
SetKeepAlive(time.Duration(defaultInt(cfg.KeepAlive, 60)) * time.Second).
|
||||
SetCleanSession(false).
|
||||
SetResumeSubs(true).
|
||||
SetConnectionLostHandler(func(_ mqtt.Client, err error) {
|
||||
logger.Printf("%s mqtt disconnected: %v", name, err)
|
||||
}).
|
||||
SetOnConnectHandler(func(client mqtt.Client) {
|
||||
logger.Printf("%s mqtt connected", name)
|
||||
mc.resubscribe(client)
|
||||
})
|
||||
if cfg.Username != "" {
|
||||
opts.SetUsername(cfg.Username)
|
||||
opts.SetPassword(cfg.Password)
|
||||
}
|
||||
mc.client = mqtt.NewClient(opts)
|
||||
token := mc.client.Connect()
|
||||
if token.WaitTimeout(15*time.Second) && token.Error() != nil {
|
||||
return nil, token.Error()
|
||||
}
|
||||
if !mc.client.IsConnected() {
|
||||
return nil, fmt.Errorf("%s mqtt connect timeout", name)
|
||||
}
|
||||
return mc, nil
|
||||
}
|
||||
|
||||
func (c *Client) Subscribe(topic string, qos byte, handler MessageHandler) error {
|
||||
if topic == "" {
|
||||
return fmt.Errorf("mqtt topic is empty")
|
||||
}
|
||||
if handler == nil {
|
||||
return fmt.Errorf("mqtt handler is nil")
|
||||
}
|
||||
c.mu.Lock()
|
||||
c.subs[topic] = subscription{qos: qos, handler: handler}
|
||||
c.mu.Unlock()
|
||||
if c.client == nil || !c.client.IsConnected() {
|
||||
return nil
|
||||
}
|
||||
err := c.subscribe(topic, qos, handler)
|
||||
if err != nil && isDisconnectedSubscribeError(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Client) PublishAsync(topic string, qos byte, retained bool, payload any) error {
|
||||
if c.client == nil {
|
||||
return fmt.Errorf("mqtt client is nil")
|
||||
}
|
||||
c.client.Publish(topic, qos, retained, payload)
|
||||
return nil
|
||||
}
|
||||
func (c *Client) Publish(topic string, qos byte, retained bool, payload any) error {
|
||||
if c.client == nil {
|
||||
return fmt.Errorf("mqtt client is nil")
|
||||
}
|
||||
token := c.client.Publish(topic, qos, retained, payload)
|
||||
if !token.WaitTimeout(15 * time.Second) {
|
||||
return fmt.Errorf("publish %s timeout", topic)
|
||||
}
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
func (c *Client) Unsubscribe(topic string) error {
|
||||
if topic == "" {
|
||||
return fmt.Errorf("mqtt topic is empty")
|
||||
}
|
||||
c.mu.Lock()
|
||||
delete(c.subs, topic)
|
||||
c.mu.Unlock()
|
||||
if c.client == nil || !c.client.IsConnected() {
|
||||
return nil
|
||||
}
|
||||
token := c.client.Unsubscribe(topic)
|
||||
if !token.WaitTimeout(15 * time.Second) {
|
||||
return fmt.Errorf("unsubscribe %s timeout", topic)
|
||||
}
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
func (c *Client) IsConnected() bool {
|
||||
return c.client != nil && c.client.IsConnected()
|
||||
}
|
||||
|
||||
func (c *Client) Disconnect(quiesce uint) {
|
||||
if c.client != nil && c.client.IsConnected() {
|
||||
c.client.Disconnect(quiesce)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) resubscribe(client mqtt.Client) {
|
||||
c.mu.RLock()
|
||||
subs := make(map[string]subscription, len(c.subs))
|
||||
for topic, sub := range c.subs {
|
||||
subs[topic] = sub
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
for topic, sub := range subs {
|
||||
if err := subscribeClient(client, topic, sub.qos, sub.handler); err != nil {
|
||||
c.logger.Printf("mqtt resubscribe %s failed: %v", topic, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) subscribe(topic string, qos byte, handler MessageHandler) error {
|
||||
return subscribeClient(c.client, topic, qos, handler)
|
||||
}
|
||||
|
||||
func subscribeClient(client mqtt.Client, topic string, qos byte, handler MessageHandler) error {
|
||||
token := client.Subscribe(topic, qos, func(_ mqtt.Client, msg mqtt.Message) {
|
||||
payload := append([]byte(nil), msg.Payload()...)
|
||||
handler(msg.Topic(), payload)
|
||||
})
|
||||
if !token.WaitTimeout(15 * time.Second) {
|
||||
return fmt.Errorf("subscribe %s timeout", topic)
|
||||
}
|
||||
return token.Error()
|
||||
}
|
||||
|
||||
func isDisconnectedSubscribeError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
msg := err.Error()
|
||||
return strings.Contains(msg, "not currently connected") || strings.Contains(msg, "connection lost")
|
||||
}
|
||||
|
||||
func defaultInt(v, fallback int) int {
|
||||
if v == 0 {
|
||||
return fallback
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func defaultString(v, fallback string) string {
|
||||
if v == "" {
|
||||
return fallback
|
||||
}
|
||||
return v
|
||||
}
|
||||
75
mqttx/queue.go
Normal file
75
mqttx/queue.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package mqttx
|
||||
|
||||
import "sync"
|
||||
|
||||
type MessageQueue[T any] struct {
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
items []T
|
||||
closed bool
|
||||
}
|
||||
|
||||
func NewMessageQueue[T any]() *MessageQueue[T] {
|
||||
q := &MessageQueue[T]{}
|
||||
q.cond = sync.NewCond(&q.mu)
|
||||
return q
|
||||
}
|
||||
|
||||
func (q *MessageQueue[T]) Push(item T) bool {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
if q.closed {
|
||||
return false
|
||||
}
|
||||
q.items = append(q.items, item)
|
||||
q.cond.Signal()
|
||||
return true
|
||||
}
|
||||
|
||||
func (q *MessageQueue[T]) Pop() (T, bool) {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
for len(q.items) == 0 && !q.closed {
|
||||
q.cond.Wait()
|
||||
}
|
||||
var zero T
|
||||
if len(q.items) == 0 {
|
||||
return zero, false
|
||||
}
|
||||
item := q.items[0]
|
||||
q.items[0] = zero
|
||||
q.items = q.items[1:]
|
||||
return item, true
|
||||
}
|
||||
|
||||
func (q *MessageQueue[T]) Drain(max int) []T {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
if max <= 0 || len(q.items) == 0 {
|
||||
return nil
|
||||
}
|
||||
if max > len(q.items) {
|
||||
max = len(q.items)
|
||||
}
|
||||
items := make([]T, max)
|
||||
copy(items, q.items[:max])
|
||||
var zero T
|
||||
for i := 0; i < max; i++ {
|
||||
q.items[i] = zero
|
||||
}
|
||||
q.items = q.items[max:]
|
||||
return items
|
||||
}
|
||||
|
||||
func (q *MessageQueue[T]) Len() int {
|
||||
q.mu.Lock()
|
||||
defer q.mu.Unlock()
|
||||
return len(q.items)
|
||||
}
|
||||
|
||||
func (q *MessageQueue[T]) Close() {
|
||||
q.mu.Lock()
|
||||
q.closed = true
|
||||
q.mu.Unlock()
|
||||
q.cond.Broadcast()
|
||||
}
|
||||
Reference in New Issue
Block a user