diff --git a/lib/EasyDartModule.dart b/lib/EasyDartModule.dart index 1de3c8d..e9d9c9a 100644 --- a/lib/EasyDartModule.dart +++ b/lib/EasyDartModule.dart @@ -17,6 +17,8 @@ import 'package:EasyDartModule/base/webserver/impl/ShelfWebServer.dart' import 'package:EasyDartModule/base/websocket/WebSocket.dart'; import 'package:event_bus/event_bus.dart'; +export 'package:EasyDartModule/base/language/extensions/StringExt.dart'; + export 'package:shelf/shelf.dart'; export 'package:mongo_dart/mongo_dart.dart'; diff --git a/lib/base/discovery/impl/NacosDiscovery.dart b/lib/base/discovery/impl/NacosDiscovery.dart index 958e3f5..c958960 100644 --- a/lib/base/discovery/impl/NacosDiscovery.dart +++ b/lib/base/discovery/impl/NacosDiscovery.dart @@ -59,12 +59,14 @@ class NacosDiscovery implements Discovery { 'groupName': groupName, 'namespaceId': config.namespaceId, }); - print(rr); + if (rr.data["code"] != 10200) { + print(rr); + } //判断心跳是否发送成功 - if(rr.data["code"]==20404){ + if (rr.data["code"] == 20404) { //实例未注册 重新注册实例 healthCheck = false; - registerInstance(serviceName, ip, port,groupName: groupName); + registerInstance(serviceName, ip, port, groupName: groupName); } } catch (e) { print(e); diff --git a/lib/base/language/Language.dart b/lib/base/language/Language.dart new file mode 100644 index 0000000..5a6b3c2 --- /dev/null +++ b/lib/base/language/Language.dart @@ -0,0 +1,68 @@ +import 'dart:convert'; +import 'dart:io'; + +class Language { + static final String defalutLocale = "zh_CN"; + + static final Map> _translations = {}; + + static void setLanguage(String locale, + {String? path, Map? language}) { + //载入json文件 + if (path != null) { + try { + //暂时只支持本地文件 + var file = File(path); + file.readAsString().then((data) { + _translations[locale] = _flattenJson(jsonDecode(data)); + }); + } catch (e) { + print(e); + } + return; + } + + if (language != null) { + //构建翻译kv + _translations[locale] = _flattenJson(language); + } + } + + // 扁平化嵌套 JSON 数据 + static Map _flattenJson(Map json, + [String prefix = '']) { + final Map flatMap = {}; + json.forEach((key, value) { + final newKey = prefix.isEmpty ? key : '$prefix.$key'; + if (value is Map) { + flatMap.addAll(_flattenJson(value, newKey)); + } else { + flatMap[newKey] = value.toString(); + } + }); + return flatMap; + } + + static void resetLanguage({String? locale}) { + if (locale == null) { + //清空全部语言文件 + _translations.clear(); + } else { + //清空指定语言文件 + _translations.remove(locale); + } + } + + static String getTranslation(String key, + {String? languageCode, Map? params}) { + var locale = languageCode ?? defalutLocale; + String text = _translations[locale]?[key] ?? key; + + // 替换动态参数(如 @name) + params?.forEach((key, value) { + text = text.replaceAll('@$key', value); + }); + + return text; + } +} diff --git a/lib/base/language/extensions/StringExt.dart b/lib/base/language/extensions/StringExt.dart new file mode 100644 index 0000000..8e14896 --- /dev/null +++ b/lib/base/language/extensions/StringExt.dart @@ -0,0 +1,8 @@ +import '../Language.dart'; + +extension StringTransform on String { + String tr({Map? params, String? languageCode}) { + return Language.getTranslation(this, + params: params, languageCode: languageCode); + } +} diff --git a/lib/base/mqtt/mqtt.dart b/lib/base/mqtt/mqtt.dart index 0bed22a..e39e43d 100644 --- a/lib/base/mqtt/mqtt.dart +++ b/lib/base/mqtt/mqtt.dart @@ -7,8 +7,35 @@ import 'package:mqtt5_client/mqtt5_server_client.dart'; class Mqtt { final MqttConfig _config; MqttClient? _client; + //标记是否自动重连 + bool reconnect = true; + //缓存自定义主题订阅信息 + Map topicCache = {}; - Mqtt(this._config); + Mqtt(this._config) { + _client = MqttServerClient.withPort( + _config.host, + "${_config.clientId}_${DateTime.now().millisecondsSinceEpoch}", + _config.port) + ..useWebSocket = + _config.host.startsWith("ws") || _config.host.startsWith("wss"); + _client?.keepAlivePeriod = _config.keepAlive; + _client?.connectionMessage = + MqttConnectMessage().keepAliveFor(_config.keepAlive); + // _client?.autoReconnect = true; + _client?.onConnected = () { + // print("aa:mqtt服务器连接成功"); + print("mqtt服务器: ${_config.host} 连接成功"); + }; + _client?.onDisconnected = () { + // print("aa:mqtt服务器连接断开"); + print("mqtt服务器: ${_config.host} 连接断开"); + //执行重连操作 + if (reconnect) { + connect(reconnect: true); + } + }; + } static late Mqtt _mqtt; @@ -20,30 +47,26 @@ class Mqtt { _mqtt = server; } - Future connect() async { - if (_client != null) { - return true; + Future connect({bool reconnect = false}) async { + if (reconnect) { + //限制重连速率 + await Future.delayed(Duration(seconds: 1)); + print("mqtt服务器: ${_config.host} 尝试重连"); } - _client = MqttServerClient.withPort( - _config.host, - "${_config.clientId}_${DateTime.now().millisecondsSinceEpoch}", - _config.port); - _client?.autoReconnect = true; - MqttConnectionStatus? state; try { state = await _client?.connect(_config.username, _config.password); } catch (e) { //连接错误 print("mqtt连接失败: 地址:${_config.host} 原因:$e"); - return false; + return connect(reconnect: this.reconnect); } - if (state!.state != MqttConnectionState.connected) { + if (state == null || state.state != MqttConnectionState.connected) { //连接失败 - print("mqtt连接失败: 地址:${_config.host} 原因:$state"); - return false; + print("mqtt连接失败-1: 地址:${_config.host} 原因:$state"); + return connect(reconnect: this.reconnect); } - print("mqtt服务器: ${_config.host} 连接成功"); + // print("mqtt服务器: ${_config.host} 连接成功"); _client?.updates.listen((List> message) { final recMess = message[0].payload as MqttPublishMessage; @@ -63,7 +86,10 @@ class Mqtt { } }); _config.topic?.forEach((topic) { - subscribe(topic, _config.qos); + subscribe(topic, _config.qos, cache: false); + }); + topicCache.forEach((t, q) { + subscribe(t, q, cache: false); }); return true; @@ -71,22 +97,33 @@ class Mqtt { void disconnect() { _client?.disconnect(); - _client = null; + reconnect = false; + // _client = null; } - void subscribe(String topic, int qos) { + void subscribe(String topic, int qos, {bool cache = true}) { + if (cache) { + topicCache[topic] = qos; + } _client?.subscribe(topic, MqttUtilities.getQosLevel(qos)); } void unSubscribe(String topic) { + topicCache.remove(topic); _client?.unsubscribeStringTopic(topic); } - void publish(String topic, String msg, {int qos = 0}) { - var payload = MqttPayloadBuilder(); - payload.addUTF8String(msg); - _client?.publishMessage( - topic, MqttUtilities.getQosLevel(qos), payload.payload!); + Future publish(String topic, String msg, {int qos = 0}) async { + try { + var payload = MqttPayloadBuilder(); + payload.addUTF8String(msg); + _client?.publishMessage( + topic, MqttUtilities.getQosLevel(qos), payload.payload!); + return true; + } catch (e) { + print(e); + return false; + } } } @@ -97,6 +134,7 @@ class MqttConfig { String? username; String? password; List? topic; + int keepAlive; int qos; //文本消息 Function(String topic, String message)? messgae; @@ -112,5 +150,6 @@ class MqttConfig { this.topic, this.qos = 0, this.username, - this.password}); + this.password, + this.keepAlive = 60}); } diff --git a/lib/base/redis/redis.dart b/lib/base/redis/redis.dart index 6c29040..86831d8 100644 --- a/lib/base/redis/redis.dart +++ b/lib/base/redis/redis.dart @@ -3,6 +3,7 @@ import 'package:redis/redis.dart'; class Redis { final RedisConfig _config; Command? _command; + bool _connected = false; Redis(this._config); @@ -16,7 +17,12 @@ class Redis { _redis = redis; } + bool isConnected() { + return this._connected; + } + Future connect({reconnect = false}) async { + _connected = false; if (reconnect) { print("尝试重连Redis"); } @@ -24,6 +30,7 @@ class Redis { try { _command = await RedisConnection().connect(_config.host, _config.port); print('Redis Connected successfully!'); + _connected = true; //定时检测是否断开连接 Future.delayed(Duration(seconds: 1), () async { do { @@ -54,22 +61,42 @@ class Redis { } Future set(String key, String value) async { - var response = await _command?.send_object(["SET", key, value]); - return response == "OK"; + try { + var response = await _command?.send_object(["SET", key, value]); + return response == "OK"; + } catch (e) { + print(e); + return false; + } } Future get(String key) async { - return await _command?.send_object(["GET", key]); + try { + return await _command?.send_object(["GET", key]); + } catch (e) { + print(e); + return null; + } } Future delete(String key) async { - return await _command?.send_object(["DEL", key]) == "OK"; + try { + return await _command?.send_object(["DEL", key]) == "OK"; + } catch (e) { + print(e); + return false; + } } Future setWithExpiry(String key, String value, int ttlInSeconds) async { - var response = await _command - ?.send_object(["SETEX", key, ttlInSeconds.toString(), value]); - return response == "OK"; + try { + var response = await _command + ?.send_object(["SETEX", key, ttlInSeconds.toString(), value]); + return response == "OK"; + } catch (e) { + print(e); + return false; + } } } diff --git a/pubspec.yaml b/pubspec.yaml index a80ed51..3275d6f 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -15,8 +15,9 @@ dependencies: shelf: ^1.4.2 shelf_router: ^1.1.4 shelf_web_socket: ^2.0.1 + shelf_multipart: ^2.0.1 uuid: ^4.5.1 - mqtt5_client: ^4.6.2 + mqtt5_client: ^4.11.0 minio: ^3.5.7 redis: ^4.0.0 dart_jsonwebtoken: ^2.14.2