新增多语言转换 mqtt支持websocket

This commit is contained in:
2025-05-29 11:48:46 +08:00
parent cff5448fe0
commit 069ca209d0
7 changed files with 182 additions and 35 deletions

View File

@@ -7,8 +7,35 @@ import 'package:mqtt5_client/mqtt5_server_client.dart';
class Mqtt {
final MqttConfig _config;
MqttClient? _client;
//标记是否自动重连
bool reconnect = true;
//缓存自定义主题订阅信息
Map<String, int> topicCache = {};
Mqtt(this._config);
Mqtt(this._config) {
_client = MqttServerClient.withPort(
_config.host,
"${_config.clientId}_${DateTime.now().millisecondsSinceEpoch}",
_config.port)
..useWebSocket =
_config.host.startsWith("ws") || _config.host.startsWith("wss");
_client?.keepAlivePeriod = _config.keepAlive;
_client?.connectionMessage =
MqttConnectMessage().keepAliveFor(_config.keepAlive);
// _client?.autoReconnect = true;
_client?.onConnected = () {
// print("aa:mqtt服务器连接成功");
print("mqtt服务器: ${_config.host} 连接成功");
};
_client?.onDisconnected = () {
// print("aa:mqtt服务器连接断开");
print("mqtt服务器: ${_config.host} 连接断开");
//执行重连操作
if (reconnect) {
connect(reconnect: true);
}
};
}
static late Mqtt _mqtt;
@@ -20,30 +47,26 @@ class Mqtt {
_mqtt = server;
}
Future<bool> connect() async {
if (_client != null) {
return true;
Future<bool> connect({bool reconnect = false}) async {
if (reconnect) {
//限制重连速率
await Future.delayed(Duration(seconds: 1));
print("mqtt服务器: ${_config.host} 尝试重连");
}
_client = MqttServerClient.withPort(
_config.host,
"${_config.clientId}_${DateTime.now().millisecondsSinceEpoch}",
_config.port);
_client?.autoReconnect = true;
MqttConnectionStatus? state;
try {
state = await _client?.connect(_config.username, _config.password);
} catch (e) {
//连接错误
print("mqtt连接失败: 地址:${_config.host} 原因:$e");
return false;
return connect(reconnect: this.reconnect);
}
if (state!.state != MqttConnectionState.connected) {
if (state == null || state.state != MqttConnectionState.connected) {
//连接失败
print("mqtt连接失败: 地址:${_config.host} 原因:$state");
return false;
print("mqtt连接失败-1: 地址:${_config.host} 原因:$state");
return connect(reconnect: this.reconnect);
}
print("mqtt服务器: ${_config.host} 连接成功");
// print("mqtt服务器: ${_config.host} 连接成功");
_client?.updates.listen((List<MqttReceivedMessage<MqttMessage>> message) {
final recMess = message[0].payload as MqttPublishMessage;
@@ -63,7 +86,10 @@ class Mqtt {
}
});
_config.topic?.forEach((topic) {
subscribe(topic, _config.qos);
subscribe(topic, _config.qos, cache: false);
});
topicCache.forEach((t, q) {
subscribe(t, q, cache: false);
});
return true;
@@ -71,22 +97,33 @@ class Mqtt {
void disconnect() {
_client?.disconnect();
_client = null;
reconnect = false;
// _client = null;
}
void subscribe(String topic, int qos) {
void subscribe(String topic, int qos, {bool cache = true}) {
if (cache) {
topicCache[topic] = qos;
}
_client?.subscribe(topic, MqttUtilities.getQosLevel(qos));
}
void unSubscribe(String topic) {
topicCache.remove(topic);
_client?.unsubscribeStringTopic(topic);
}
void publish(String topic, String msg, {int qos = 0}) {
var payload = MqttPayloadBuilder();
payload.addUTF8String(msg);
_client?.publishMessage(
topic, MqttUtilities.getQosLevel(qos), payload.payload!);
Future<bool> publish(String topic, String msg, {int qos = 0}) async {
try {
var payload = MqttPayloadBuilder();
payload.addUTF8String(msg);
_client?.publishMessage(
topic, MqttUtilities.getQosLevel(qos), payload.payload!);
return true;
} catch (e) {
print(e);
return false;
}
}
}
@@ -97,6 +134,7 @@ class MqttConfig {
String? username;
String? password;
List<String>? topic;
int keepAlive;
int qos;
//文本消息
Function(String topic, String message)? messgae;
@@ -112,5 +150,6 @@ class MqttConfig {
this.topic,
this.qos = 0,
this.username,
this.password});
this.password,
this.keepAlive = 60});
}