package mqttx import ( "fmt" "log" "strings" "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) type EndpointConfig struct { Host string `json:"host"` Port int `json:"port"` ClientID string `json:"clientId"` Username string `json:"username"` Password string `json:"password"` KeepAlive int `json:"keepAlive"` } type Config struct { Host string `json:"host"` Port int `json:"port"` ClientID string `json:"clientId"` Topic []string `json:"topic"` Username string `json:"username"` Password string `json:"password"` Inner EndpointConfig `json:"inner"` } type MessageHandler func(topic string, payload []byte) type ConnectOption func(*connectOptions) type connectOptions struct { clientIDTimestamp bool } func WithClientIDTimestamp() ConnectOption { return func(opts *connectOptions) { opts.clientIDTimestamp = true } } type Client struct { client mqtt.Client logger *log.Logger mu sync.RWMutex subs map[string]subscription } type subscription struct { qos byte handler MessageHandler } func Connect(name string, cfg EndpointConfig, fallbackClientID string, logger *log.Logger, options ...ConnectOption) (*Client, error) { if cfg.Host == "" { return nil, fmt.Errorf("%s mqtt host is empty", name) } if logger == nil { logger = log.Default() } connectOpts := connectOptions{} for _, option := range options { if option != nil { option(&connectOpts) } } clientID := defaultString(cfg.ClientID, fallbackClientID) if connectOpts.clientIDTimestamp { clientID = fmt.Sprintf("%s_%d", clientID, time.Now().UnixMilli()) } mc := &Client{logger: logger, subs: map[string]subscription{}} broker := cfg.Host if !strings.HasPrefix(broker, "tcp://") && !strings.HasPrefix(broker, "ssl://") && !strings.HasPrefix(broker, "ws://") { broker = fmt.Sprintf("tcp://%s:%d", cfg.Host, defaultInt(cfg.Port, 1883)) } opts := mqtt.NewClientOptions(). AddBroker(broker). SetClientID(clientID). SetAutoReconnect(true). SetConnectRetry(true). SetKeepAlive(time.Duration(defaultInt(cfg.KeepAlive, 60)) * time.Second). SetCleanSession(false). SetResumeSubs(true). SetConnectionLostHandler(func(_ mqtt.Client, err error) { logger.Printf("%s mqtt disconnected: %v", name, err) }). SetOnConnectHandler(func(client mqtt.Client) { logger.Printf("%s mqtt connected", name) mc.resubscribe(client) }) if cfg.Username != "" { opts.SetUsername(cfg.Username) opts.SetPassword(cfg.Password) } mc.client = mqtt.NewClient(opts) token := mc.client.Connect() if token.WaitTimeout(15*time.Second) && token.Error() != nil { return nil, token.Error() } if !mc.client.IsConnected() { return nil, fmt.Errorf("%s mqtt connect timeout", name) } return mc, nil } func (c *Client) Subscribe(topic string, qos byte, handler MessageHandler) error { if topic == "" { return fmt.Errorf("mqtt topic is empty") } if handler == nil { return fmt.Errorf("mqtt handler is nil") } c.mu.Lock() c.subs[topic] = subscription{qos: qos, handler: handler} c.mu.Unlock() if c.client == nil || !c.client.IsConnected() { return nil } err := c.subscribe(topic, qos, handler) if err != nil && isDisconnectedSubscribeError(err) { return nil } return err } func (c *Client) PublishAsync(topic string, qos byte, retained bool, payload any) error { if c.client == nil { return fmt.Errorf("mqtt client is nil") } c.client.Publish(topic, qos, retained, payload) return nil } func (c *Client) Publish(topic string, qos byte, retained bool, payload any) error { if c.client == nil { return fmt.Errorf("mqtt client is nil") } token := c.client.Publish(topic, qos, retained, payload) if !token.WaitTimeout(15 * time.Second) { return fmt.Errorf("publish %s timeout", topic) } return token.Error() } func (c *Client) Unsubscribe(topic string) error { if topic == "" { return fmt.Errorf("mqtt topic is empty") } c.mu.Lock() delete(c.subs, topic) c.mu.Unlock() if c.client == nil || !c.client.IsConnected() { return nil } token := c.client.Unsubscribe(topic) if !token.WaitTimeout(15 * time.Second) { return fmt.Errorf("unsubscribe %s timeout", topic) } return token.Error() } func (c *Client) IsConnected() bool { return c.client != nil && c.client.IsConnected() } func (c *Client) Disconnect(quiesce uint) { if c.client != nil && c.client.IsConnected() { c.client.Disconnect(quiesce) } } func (c *Client) resubscribe(client mqtt.Client) { c.mu.RLock() subs := make(map[string]subscription, len(c.subs)) for topic, sub := range c.subs { subs[topic] = sub } c.mu.RUnlock() for topic, sub := range subs { if err := subscribeClient(client, topic, sub.qos, sub.handler); err != nil { c.logger.Printf("mqtt resubscribe %s failed: %v", topic, err) } } } func (c *Client) subscribe(topic string, qos byte, handler MessageHandler) error { return subscribeClient(c.client, topic, qos, handler) } func subscribeClient(client mqtt.Client, topic string, qos byte, handler MessageHandler) error { token := client.Subscribe(topic, qos, func(_ mqtt.Client, msg mqtt.Message) { payload := append([]byte(nil), msg.Payload()...) handler(msg.Topic(), payload) }) if !token.WaitTimeout(15 * time.Second) { return fmt.Errorf("subscribe %s timeout", topic) } return token.Error() } func isDisconnectedSubscribeError(err error) bool { if err == nil { return false } msg := err.Error() return strings.Contains(msg, "not currently connected") || strings.Contains(msg, "connection lost") } func defaultInt(v, fallback int) int { if v == 0 { return fallback } return v } func defaultString(v, fallback string) string { if v == "" { return fallback } return v }