import 'dart:convert'; import 'dart:typed_data'; import 'package:mqtt5_client/mqtt5_client.dart'; import 'package:mqtt5_client/mqtt5_server_client.dart'; import 'package:typed_data/src/typed_buffer.dart'; class Mqtt { final MqttConfig _config; MqttClient? _client; //标记是否自动重连 bool reconnect = true; //缓存自定义主题订阅信息 Map topicCache = {}; Mqtt(this._config) { _client = MqttServerClient.withPort( _config.host, "${_config.clientId}_${DateTime.now().millisecondsSinceEpoch}", _config.port) ..useWebSocket = _config.host.startsWith("ws") || _config.host.startsWith("wss"); _client?.keepAlivePeriod = _config.keepAlive; _client?.connectionMessage = MqttConnectMessage().keepAliveFor(_config.keepAlive); // _client?.autoReconnect = true; _client?.onConnected = () { // print("aa:mqtt服务器连接成功"); print("mqtt服务器: ${_config.host} 连接成功"); }; _client?.onDisconnected = () { // print("aa:mqtt服务器连接断开"); print("mqtt服务器: ${_config.host} 连接断开"); //执行重连操作 if (reconnect) { connect(reconnect: true); } }; } static late Mqtt _mqtt; static Mqtt getInstance() { return _mqtt; } static void setInstance(Mqtt server) { _mqtt = server; } Future connect({bool reconnect = false}) async { if (reconnect) { //限制重连速率 await Future.delayed(Duration(seconds: 1)); print("mqtt服务器: ${_config.host} 尝试重连"); } MqttConnectionStatus? state; try { state = await _client?.connect(_config.username, _config.password); } catch (e) { //连接错误 print("mqtt连接失败: 地址:${_config.host} 原因:$e"); return connect(reconnect: this.reconnect); } if (state == null || state.state != MqttConnectionState.connected) { //连接失败 print("mqtt连接失败-1: 地址:${_config.host} 原因:$state"); return connect(reconnect: this.reconnect); } // print("mqtt服务器: ${_config.host} 连接成功"); _client?.updates.listen((List> message) { final recMess = message[0].payload as MqttPublishMessage; try { if (_config.messgae != null) { final payload = Utf8Decoder().convert(recMess.payload.message!); // final payload = // MqttUtilities.bytesToStringAsString(recMess.payload.message!); _config.messgae!(message[0].topic!, payload); } else if (_config.buffMessage != null) { _config.buffMessage!( message[0].topic!, recMess.payload.message!.buffer.asUint8List()); } } catch (e) { //转字符串失败 print(e); } }); _config.topic?.forEach((topic) { subscribe(topic, _config.qos, cache: false); }); topicCache.forEach((t, q) { subscribe(t, q, cache: false); }); return true; } void disconnect() { _client?.disconnect(); reconnect = false; // _client = null; } void subscribe(String topic, int qos, {bool cache = true}) { if (cache) { topicCache[topic] = qos; } _client?.subscribe(topic, MqttUtilities.getQosLevel(qos)); } void unSubscribe(String topic) { topicCache.remove(topic); _client?.unsubscribeStringTopic(topic); } Future publish(String topic, String msg, {int qos = 0}) async { try { var payload = MqttPayloadBuilder(); payload.addUTF8String(msg); _client?.publishMessage( topic, MqttUtilities.getQosLevel(qos), payload.payload!); return true; } catch (e) { print(e); return false; } } Future publishBuff(String topic, Uint8List data, {int qos = 0}) async { try { var payload = MqttPayloadBuilder(); // data.buffer payload.addBuffer(Uint8Buffer()..addAll(data)); _client?.publishMessage( topic, MqttUtilities.getQosLevel(qos), payload.payload!); return true; } catch (e) { print(e); return false; } } } class MqttConfig { final String host; final int port; final String clientId; String? username; String? password; List? topic; int keepAlive; int qos; //文本消息 Function(String topic, String message)? messgae; //二进制消息 Function(String topic, Uint8List buff)? buffMessage; MqttConfig( {required this.host, this.port = 1883, required this.clientId, this.messgae, this.buffMessage, this.topic, this.qos = 0, this.username, this.password, this.keepAlive = 60}); }