commit b759ceb1c651718a6bfe03ff25dc6e8bffac5a94 Author: qmqz Date: Wed May 6 14:53:45 2026 +0800 初始版本 diff --git a/convx/convx.go b/convx/convx.go new file mode 100644 index 0000000..01acb9a --- /dev/null +++ b/convx/convx.go @@ -0,0 +1,43 @@ +package convx + +import ( + "encoding/json" + "strconv" +) + +func Int(v any) int { + return int(Int64(v)) +} + +func Int64(v any) int64 { + switch n := v.(type) { + case int: + return int64(n) + case int64: + return n + case float64: + return int64(n) + case json.Number: + i, _ := n.Int64() + return i + case string: + i, _ := strconv.ParseInt(n, 10, 64) + return i + default: + return 0 + } +} + +func ValueOrZero(v, invalid int) int { + if v == invalid { + return 0 + } + return v +} + +func Ternary[T any](cond bool, yes, no T) T { + if cond { + return yes + } + return no +} diff --git a/envx/envx.go b/envx/envx.go new file mode 100644 index 0000000..93c85af --- /dev/null +++ b/envx/envx.go @@ -0,0 +1,128 @@ +package envx + +import ( + "net" + "os" + "strconv" + "strings" +) + +func String(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} + +func Int(key string, fallback int) int { + if v := os.Getenv(key); v != "" { + if i, err := strconv.Atoi(v); err == nil { + return i + } + } + return fallback +} + +func LocalIP() string { + for _, key := range []string{"nacos_bindIp", "NACOS_BIND_IP", "LOCAL_IP", "BIND_IP"} { + if ip := validIPv4(os.Getenv(key)); ip != "" { + return ip + } + } + if ip := hostnameIP(); ip != "" { + return ip + } + if ip := interfaceIP(); ip != "" { + return ip + } + if ip := outboundIP(); ip != "" { + return ip + } + return "127.0.0.1" +} + +func hostnameIP() string { + hostname, err := os.Hostname() + if err != nil || hostname == "" { + return "" + } + addrs, err := net.LookupIP(hostname) + if err != nil { + return "" + } + for _, addr := range addrs { + if ip := validIPv4(addr.String()); ip != "" { + return ip + } + } + return "" +} + +func interfaceIP() string { + ifaces, err := net.Interfaces() + if err != nil { + return "" + } + bestIP := "" + bestScore := -1 + for _, iface := range ifaces { + if iface.Flags&net.FlagUp == 0 || iface.Flags&net.FlagLoopback != 0 { + continue + } + addrs, err := iface.Addrs() + if err != nil { + continue + } + for _, addr := range addrs { + ipNet, ok := addr.(*net.IPNet) + if !ok { + continue + } + ip := validIPv4(ipNet.IP.String()) + if ip == "" { + continue + } + score := interfaceScore(iface.Name) + if score > bestScore { + bestIP = ip + bestScore = score + } + } + } + return bestIP +} + +func interfaceScore(name string) int { + name = strings.ToLower(name) + switch { + case name == "eth0": + return 100 + case strings.HasPrefix(name, "eth"): + return 90 + case strings.HasPrefix(name, "en"), strings.HasPrefix(name, "wl"): + return 80 + case strings.Contains(name, "docker"), strings.HasPrefix(name, "br-"), strings.HasPrefix(name, "veth"): + return 10 + default: + return 50 + } +} + +func outboundIP() string { + conn, err := net.Dial("udp", "8.8.8.8:80") + if err == nil { + defer conn.Close() + if addr, ok := conn.LocalAddr().(*net.UDPAddr); ok { + return validIPv4(addr.IP.String()) + } + } + return "" +} + +func validIPv4(value string) string { + ip := net.ParseIP(strings.TrimSpace(value)).To4() + if ip == nil || ip.IsLoopback() || ip.IsUnspecified() || ip.IsLinkLocalUnicast() { + return "" + } + return ip.String() +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d60c7c8 --- /dev/null +++ b/go.mod @@ -0,0 +1,27 @@ +module easy_go_module + +go 1.24.0 + +require ( + github.com/eclipse/paho.mqtt.golang v1.5.1 + github.com/redis/go-redis/v9 v9.19.0 + go.mongodb.org/mongo-driver v1.17.9 +) + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect + go.uber.org/atomic v1.11.0 // indirect + golang.org/x/crypto v0.42.0 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..bbd39e7 --- /dev/null +++ b/go.sum @@ -0,0 +1,71 @@ +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= +github.com/redis/go-redis/v9 v9.7.0/go.mod h1:f6zhXITC7JUJIlPEiBOTXxJgPLdZcA93GewI7inzyWw= +github.com/redis/go-redis/v9 v9.19.0 h1:XPVaaPSnG6RhYf7p+rmSa9zZfeVAnWsH5h3lxthOm/k= +github.com/redis/go-redis/v9 v9.19.0/go.mod h1:v/M13XI1PVCDcm01VtPFOADfZtHf8YW3baQf57KlIkA= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM= +github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.17.1/go.mod h1:wwWm/+BuOddhcq3n68LKRmgk2wXzmF6s0SFOa0GINL4= +go.mongodb.org/mongo-driver v1.17.9 h1:IexDdCuuNJ3BHrELgBlyaH9p60JXAvdzWR128q+U5tU= +go.mongodb.org/mongo-driver v1.17.9/go.mod h1:LlOhpH5NUEfhxcAwG0UEkMqwYcc4JU18gtCdGudk/tQ= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/crypto v0.42.0 h1:chiH31gIWm57EkTXpwnqf8qeuMUi0yekh6mT2AvFlqI= +golang.org/x/crypto v0.42.0/go.mod h1:4+rDnOTJhQCx2q7/j6rAN5XDw8kPjeaXEUR2eL94ix8= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/httpx/httpx.go b/httpx/httpx.go new file mode 100644 index 0000000..4463b51 --- /dev/null +++ b/httpx/httpx.go @@ -0,0 +1,60 @@ +package httpx + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" +) + +type Client struct { + HTTP *http.Client +} + +func New(timeout time.Duration) *Client { + return &Client{HTTP: &http.Client{Timeout: timeout}} +} + +func Wrap(client *http.Client) *Client { + if client == nil { + client = http.DefaultClient + } + return &Client{HTTP: client} +} + +func (c *Client) GetBytes(target string) ([]byte, int, error) { + resp, err := c.HTTP.Get(target) + if err != nil { + return nil, 0, err + } + defer resp.Body.Close() + body, err := io.ReadAll(resp.Body) + return body, resp.StatusCode, err +} + +func (c *Client) PostJSON(target string, data any) ([]byte, int, error) { + body, err := json.Marshal(data) + if err != nil { + return nil, 0, err + } + resp, err := c.HTTP.Post(target, "application/json", bytes.NewReader(body)) + if err != nil { + return nil, 0, err + } + defer resp.Body.Close() + respBody, err := io.ReadAll(resp.Body) + return respBody, resp.StatusCode, err +} + +func MustOK(body []byte, status int, err error) error { + if err != nil { + return err + } + if status >= http.StatusMultipleChoices { + return fmt.Errorf("http status=%d body=%s", status, strings.TrimSpace(string(body))) + } + return nil +} diff --git a/mongox/mongox.go b/mongox/mongox.go new file mode 100644 index 0000000..8fe154d --- /dev/null +++ b/mongox/mongox.go @@ -0,0 +1,37 @@ +package mongox + +import ( + "context" + "fmt" + "strings" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type Config struct { + Host string `json:"host"` + UserName string `json:"userName"` + Password string `json:"password"` + Database string `json:"dataBase"` +} + +func Connect(ctx context.Context, cfg Config) (*mongo.Client, *mongo.Database, error) { + uri := cfg.Host + if !strings.HasPrefix(uri, "mongodb://") && !strings.HasPrefix(uri, "mongodb+srv://") { + uri = "mongodb://" + uri + } + opts := options.Client().ApplyURI(uri) + if cfg.UserName != "" { + opts.SetAuth(options.Credential{Username: cfg.UserName, Password: cfg.Password}) + } + client, err := mongo.Connect(ctx, opts) + if err != nil { + return nil, nil, fmt.Errorf("connect mongodb: %w", err) + } + if err := client.Ping(ctx, nil); err != nil { + _ = client.Disconnect(ctx) + return nil, nil, fmt.Errorf("ping mongodb: %w", err) + } + return client, client.Database(cfg.Database), nil +} diff --git a/mqttx/mqttx.go b/mqttx/mqttx.go new file mode 100644 index 0000000..0a7b3a6 --- /dev/null +++ b/mqttx/mqttx.go @@ -0,0 +1,223 @@ +package mqttx + +import ( + "fmt" + "log" + "strings" + "sync" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +type EndpointConfig struct { + Host string `json:"host"` + Port int `json:"port"` + ClientID string `json:"clientId"` + Username string `json:"username"` + Password string `json:"password"` + KeepAlive int `json:"keepAlive"` +} +type Config struct { + Host string `json:"host"` + Port int `json:"port"` + ClientID string `json:"clientId"` + Topic []string `json:"topic"` + Username string `json:"username"` + Password string `json:"password"` + Inner EndpointConfig `json:"inner"` +} + +type MessageHandler func(topic string, payload []byte) + +type ConnectOption func(*connectOptions) + +type connectOptions struct { + clientIDTimestamp bool +} + +func WithClientIDTimestamp() ConnectOption { + return func(opts *connectOptions) { + opts.clientIDTimestamp = true + } +} + +type Client struct { + client mqtt.Client + logger *log.Logger + mu sync.RWMutex + subs map[string]subscription +} + +type subscription struct { + qos byte + handler MessageHandler +} + +func Connect(name string, cfg EndpointConfig, fallbackClientID string, logger *log.Logger, options ...ConnectOption) (*Client, error) { + if cfg.Host == "" { + return nil, fmt.Errorf("%s mqtt host is empty", name) + } + if logger == nil { + logger = log.Default() + } + connectOpts := connectOptions{} + for _, option := range options { + if option != nil { + option(&connectOpts) + } + } + clientID := defaultString(cfg.ClientID, fallbackClientID) + if connectOpts.clientIDTimestamp { + clientID = fmt.Sprintf("%s_%d", clientID, time.Now().UnixMilli()) + } + mc := &Client{logger: logger, subs: map[string]subscription{}} + broker := cfg.Host + if !strings.HasPrefix(broker, "tcp://") && !strings.HasPrefix(broker, "ssl://") && !strings.HasPrefix(broker, "ws://") { + broker = fmt.Sprintf("tcp://%s:%d", cfg.Host, defaultInt(cfg.Port, 1883)) + } + opts := mqtt.NewClientOptions(). + AddBroker(broker). + SetClientID(clientID). + SetAutoReconnect(true). + SetConnectRetry(true). + SetKeepAlive(time.Duration(defaultInt(cfg.KeepAlive, 60)) * time.Second). + SetCleanSession(false). + SetResumeSubs(true). + SetConnectionLostHandler(func(_ mqtt.Client, err error) { + logger.Printf("%s mqtt disconnected: %v", name, err) + }). + SetOnConnectHandler(func(client mqtt.Client) { + logger.Printf("%s mqtt connected", name) + mc.resubscribe(client) + }) + if cfg.Username != "" { + opts.SetUsername(cfg.Username) + opts.SetPassword(cfg.Password) + } + mc.client = mqtt.NewClient(opts) + token := mc.client.Connect() + if token.WaitTimeout(15*time.Second) && token.Error() != nil { + return nil, token.Error() + } + if !mc.client.IsConnected() { + return nil, fmt.Errorf("%s mqtt connect timeout", name) + } + return mc, nil +} + +func (c *Client) Subscribe(topic string, qos byte, handler MessageHandler) error { + if topic == "" { + return fmt.Errorf("mqtt topic is empty") + } + if handler == nil { + return fmt.Errorf("mqtt handler is nil") + } + c.mu.Lock() + c.subs[topic] = subscription{qos: qos, handler: handler} + c.mu.Unlock() + if c.client == nil || !c.client.IsConnected() { + return nil + } + err := c.subscribe(topic, qos, handler) + if err != nil && isDisconnectedSubscribeError(err) { + return nil + } + return err +} + +func (c *Client) PublishAsync(topic string, qos byte, retained bool, payload any) error { + if c.client == nil { + return fmt.Errorf("mqtt client is nil") + } + c.client.Publish(topic, qos, retained, payload) + return nil +} +func (c *Client) Publish(topic string, qos byte, retained bool, payload any) error { + if c.client == nil { + return fmt.Errorf("mqtt client is nil") + } + token := c.client.Publish(topic, qos, retained, payload) + if !token.WaitTimeout(15 * time.Second) { + return fmt.Errorf("publish %s timeout", topic) + } + return token.Error() +} + +func (c *Client) Unsubscribe(topic string) error { + if topic == "" { + return fmt.Errorf("mqtt topic is empty") + } + c.mu.Lock() + delete(c.subs, topic) + c.mu.Unlock() + if c.client == nil || !c.client.IsConnected() { + return nil + } + token := c.client.Unsubscribe(topic) + if !token.WaitTimeout(15 * time.Second) { + return fmt.Errorf("unsubscribe %s timeout", topic) + } + return token.Error() +} + +func (c *Client) IsConnected() bool { + return c.client != nil && c.client.IsConnected() +} + +func (c *Client) Disconnect(quiesce uint) { + if c.client != nil && c.client.IsConnected() { + c.client.Disconnect(quiesce) + } +} + +func (c *Client) resubscribe(client mqtt.Client) { + c.mu.RLock() + subs := make(map[string]subscription, len(c.subs)) + for topic, sub := range c.subs { + subs[topic] = sub + } + c.mu.RUnlock() + for topic, sub := range subs { + if err := subscribeClient(client, topic, sub.qos, sub.handler); err != nil { + c.logger.Printf("mqtt resubscribe %s failed: %v", topic, err) + } + } +} + +func (c *Client) subscribe(topic string, qos byte, handler MessageHandler) error { + return subscribeClient(c.client, topic, qos, handler) +} + +func subscribeClient(client mqtt.Client, topic string, qos byte, handler MessageHandler) error { + token := client.Subscribe(topic, qos, func(_ mqtt.Client, msg mqtt.Message) { + payload := append([]byte(nil), msg.Payload()...) + handler(msg.Topic(), payload) + }) + if !token.WaitTimeout(15 * time.Second) { + return fmt.Errorf("subscribe %s timeout", topic) + } + return token.Error() +} + +func isDisconnectedSubscribeError(err error) bool { + if err == nil { + return false + } + msg := err.Error() + return strings.Contains(msg, "not currently connected") || strings.Contains(msg, "connection lost") +} + +func defaultInt(v, fallback int) int { + if v == 0 { + return fallback + } + return v +} + +func defaultString(v, fallback string) string { + if v == "" { + return fallback + } + return v +} diff --git a/mqttx/queue.go b/mqttx/queue.go new file mode 100644 index 0000000..cb45236 --- /dev/null +++ b/mqttx/queue.go @@ -0,0 +1,75 @@ +package mqttx + +import "sync" + +type MessageQueue[T any] struct { + mu sync.Mutex + cond *sync.Cond + items []T + closed bool +} + +func NewMessageQueue[T any]() *MessageQueue[T] { + q := &MessageQueue[T]{} + q.cond = sync.NewCond(&q.mu) + return q +} + +func (q *MessageQueue[T]) Push(item T) bool { + q.mu.Lock() + defer q.mu.Unlock() + if q.closed { + return false + } + q.items = append(q.items, item) + q.cond.Signal() + return true +} + +func (q *MessageQueue[T]) Pop() (T, bool) { + q.mu.Lock() + defer q.mu.Unlock() + for len(q.items) == 0 && !q.closed { + q.cond.Wait() + } + var zero T + if len(q.items) == 0 { + return zero, false + } + item := q.items[0] + q.items[0] = zero + q.items = q.items[1:] + return item, true +} + +func (q *MessageQueue[T]) Drain(max int) []T { + q.mu.Lock() + defer q.mu.Unlock() + if max <= 0 || len(q.items) == 0 { + return nil + } + if max > len(q.items) { + max = len(q.items) + } + items := make([]T, max) + copy(items, q.items[:max]) + var zero T + for i := 0; i < max; i++ { + q.items[i] = zero + } + q.items = q.items[max:] + return items +} + +func (q *MessageQueue[T]) Len() int { + q.mu.Lock() + defer q.mu.Unlock() + return len(q.items) +} + +func (q *MessageQueue[T]) Close() { + q.mu.Lock() + q.closed = true + q.mu.Unlock() + q.cond.Broadcast() +} diff --git a/nacosx/nacosx.go b/nacosx/nacosx.go new file mode 100644 index 0000000..9a78ffe --- /dev/null +++ b/nacosx/nacosx.go @@ -0,0 +1,122 @@ +package nacosx + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "strconv" + "strings" + "time" +) + +type Config struct { + Server string + NamespaceID string + BindIP string + Timeout time.Duration +} + +type Client struct { + baseURL string + namespaceID string + bindIP string + client *http.Client +} + +func New(cfg Config) *Client { + timeout := cfg.Timeout + if timeout == 0 { + timeout = 15 * time.Second + } + return &Client{ + baseURL: strings.TrimRight(cfg.Server, "/"), + namespaceID: cfg.NamespaceID, + bindIP: cfg.BindIP, + client: &http.Client{Timeout: timeout}, + } +} + +func (c *Client) BaseURL() string { + return c.baseURL +} + +func (c *Client) BindIP() string { + return c.bindIP +} + +func (c *Client) ConfigJSON(dataID string, dst any) error { + body, err := c.GetConfig(dataID) + if err != nil { + return err + } + return json.Unmarshal(body, dst) +} + +func (c *Client) GetConfig(dataID string) ([]byte, error) { + u, _ := url.Parse(c.baseURL + "/nacos/v1/cs/configs") + q := u.Query() + q.Set("dataId", dataID) + q.Set("group", "DEFAULT_GROUP") + q.Set("tenant", c.namespaceID) + u.RawQuery = q.Encode() + + resp, err := c.client.Get(u.String()) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, _ := io.ReadAll(resp.Body) + if resp.StatusCode >= http.StatusMultipleChoices { + return nil, fmt.Errorf("nacos config %s status=%d body=%s", dataID, resp.StatusCode, string(body)) + } + return body, nil +} + +func (c *Client) Register(serviceName string, port int) error { + values := url.Values{} + values.Set("serviceName", serviceName) + values.Set("ip", c.bindIP) + values.Set("port", strconv.Itoa(port)) + values.Set("namespaceId", c.namespaceID) + values.Set("ephemeral", "true") + resp, err := c.client.PostForm(c.baseURL+"/nacos/v1/ns/instance", values) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode >= http.StatusMultipleChoices { + body, _ := io.ReadAll(resp.Body) + return fmt.Errorf("status=%d body=%s", resp.StatusCode, string(body)) + } + return nil +} + +func (c *Client) Heartbeat(serviceName string, port int) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for range ticker.C { + beat, _ := json.Marshal(map[string]any{"serviceName": serviceName, "ip": c.bindIP, "port": port, "ephemeral": true}) + u, _ := url.Parse(c.baseURL + "/nacos/v1/ns/instance/beat") + q := u.Query() + q.Set("serviceName", serviceName) + q.Set("ip", c.bindIP) + q.Set("port", strconv.Itoa(port)) + q.Set("namespaceId", c.namespaceID) + q.Set("beat", string(beat)) + u.RawQuery = q.Encode() + req, _ := http.NewRequest(http.MethodPut, u.String(), nil) + resp, err := c.client.Do(req) + if err != nil { + log.Printf("nacos heartbeat failed: %v", err) + continue + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + if resp.StatusCode >= http.StatusMultipleChoices { + log.Printf("nacos heartbeat status=%d", resp.StatusCode) + } + } +} diff --git a/redisx/redisx.go b/redisx/redisx.go new file mode 100644 index 0000000..3bd3890 --- /dev/null +++ b/redisx/redisx.go @@ -0,0 +1,28 @@ +package redisx + +import ( + "context" + "fmt" + + "github.com/redis/go-redis/v9" +) + +type Config struct { + Host string `json:"host"` + Port int `json:"port"` +} + +func Connect(ctx context.Context, cfg Config) (*redis.Client, error) { + client := redis.NewClient(&redis.Options{Addr: fmt.Sprintf("%s:%d", cfg.Host, defaultInt(cfg.Port, 6379))}) + if err := client.Ping(ctx).Err(); err != nil { + return client, err + } + return client, nil +} + +func defaultInt(v, fallback int) int { + if v == 0 { + return fallback + } + return v +}