From a15345adeb7d58de42037a6337463f6e08ef4fa9 Mon Sep 17 00:00:00 2001 From: qmqz Date: Wed, 5 Mar 2025 14:10:03 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8A=A0=E5=85=A5=E9=87=8D=E8=BF=9E=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=E5=BF=BD=E7=95=A5=E6=97=A5=E5=BF=97=E8=AE=B0=E5=BD=95?= =?UTF-8?q?=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/base/database/impl/MongoDb.dart | 30 +++++++++++++++------- lib/base/http/TraceDio.dart | 27 ++++++++++++-------- lib/base/logger/impl/LokiLogger.dart | 11 ++++++++- lib/base/mqtt/mqtt.dart | 17 ++++++++++--- lib/base/redis/redis.dart | 37 ++++++++++++++++++++++++++-- 5 files changed, 96 insertions(+), 26 deletions(-) diff --git a/lib/base/database/impl/MongoDb.dart b/lib/base/database/impl/MongoDb.dart index 01cc0c1..8582767 100644 --- a/lib/base/database/impl/MongoDb.dart +++ b/lib/base/database/impl/MongoDb.dart @@ -1,3 +1,5 @@ +import 'dart:io'; + import 'package:EasyDartModule/base/database/DataBase.dart'; import 'package:mongo_dart/mongo_dart.dart'; @@ -7,16 +9,26 @@ class MongoDb implements DataBase { MongoDb(this.config) : db = Db( "mongodb://${config.userName}:${config.password}@${config.host}/${config.dataBase}?authSource=admin") { + connect(false); + } + + void connect(reconnect) { + if (reconnect) { + print("尝试重连MongoDb"); + } Future.delayed(Duration(seconds: 1), () async { - do { - try { - await db.open(); - print('Connected successfully!'); - } catch (e) { - print('Connection error: $e'); - await Future.delayed(Duration(seconds: 1)); - } - } while (!db.isConnected); + try { + await db.open(); + print('MongoDb Connected successfully!'); + //定时检测数据库是否断开 + do { + await Future.delayed(Duration(seconds: 5)); + } while (db.isConnected); + connect(true); + } catch (e) { + print('MongoDb Connection error: $e'); + connect(reconnect); + } }); } diff --git a/lib/base/http/TraceDio.dart b/lib/base/http/TraceDio.dart index c458932..243a286 100644 --- a/lib/base/http/TraceDio.dart +++ b/lib/base/http/TraceDio.dart @@ -36,7 +36,7 @@ class TraceDio { options.headers['X-Span-ID'] = spanId; } // 记录请求日志 - _logger?.info('Sending request: ${options.method} ${options.uri}', + _logger?.info('发送请求: ${options.method} ${options.uri}', tag: "DIO", traceId: traceId, spanId: spanId); return handler.next(options); // 继续请求 }, @@ -44,11 +44,9 @@ class TraceDio { // 记录响应日志 String? traceId = response.headers.value('X-Trace-ID'); String? spanId = response.headers.value('X-Span-ID'); - _logger?.info( - 'Response received: ${response.statusCode} ${response.statusMessage}', - tag: "DIO", - traceId: traceId, - spanId: spanId); + _logger?.info('接收数据: ${response.statusCode} ${response.statusMessage}', + tag: "DIO", traceId: traceId, spanId: spanId); + //自动更新token if (response.headers["token"] != null) { token = response.headers["token"]?[0]; } @@ -58,8 +56,18 @@ class TraceDio { // 记录错误日志 String? traceId = e.requestOptions.headers['X-Trace-ID']; String? spanId = e.requestOptions.headers['X-Span-ID']; - _logger?.error('Request failed: ${e.message}', - tag: "DIO", traceId: traceId, spanId: spanId); + String errMsg; + if (e.response != null) { + errMsg = + "请求错误: url:${e.requestOptions.path} 请求方法:${e.requestOptions.method} 请求数据:${e.requestOptions.data} 响应代码:${e.response?.statusCode} 响应消息:${e.response?.statusMessage} 响应内容:${e.response?.data}"; + } else { + errMsg = + "请求错误: url:${e.requestOptions.path} 请求方法:${e.requestOptions.method} 请求数据:${e.requestOptions.data} 错误消息:${e.error}"; + } + + print(errMsg); + _logger?.error(errMsg, tag: "DIO", traceId: traceId, spanId: spanId); + return handler.next(e); // 继续处理错误 }, )); @@ -142,8 +150,7 @@ class TraceDio { data: data, queryParameters: queryParameters, options: Options( - receiveTimeout: receiveTimeout, - headers: getHeader(request: request)), + receiveTimeout: receiveTimeout, headers: getHeader(request: request)), ); } } diff --git a/lib/base/logger/impl/LokiLogger.dart b/lib/base/logger/impl/LokiLogger.dart index bccae27..6559080 100644 --- a/lib/base/logger/impl/LokiLogger.dart +++ b/lib/base/logger/impl/LokiLogger.dart @@ -25,7 +25,14 @@ class LokiLogger implements Logger { BaseOptions(baseUrl: _config == null ? "" : _config.host, headers: { "Content-Type": "application/json", if (!identical(0, 0.0)) "Content-Encoding": "gzip" - })); + })) { + dio.interceptors.add(InterceptorsWrapper( + onError: (error, handler) => { + //忽略异常 + print("logerr:###$error###") + }, + )); + } @override void debug(String msg, {String? tag, String? traceId, String? spanId, String? parentSpanId}) { @@ -106,12 +113,14 @@ class LokiLogger implements Logger { ] }); //判断平台 + if (identical(0, 0.0)) { dio.post("/loki/api/v1/push", data: data); } else { var zip = gzip.encode(utf8.encode(data)); dio.post("/loki/api/v1/push", data: zip); } + if (level == LoggerLevel.debug || _config.print) { print(log); } diff --git a/lib/base/mqtt/mqtt.dart b/lib/base/mqtt/mqtt.dart index 4bf1625..e225ed9 100644 --- a/lib/base/mqtt/mqtt.dart +++ b/lib/base/mqtt/mqtt.dart @@ -23,16 +23,25 @@ class Mqtt { if (_client != null) { return true; } - _client = MqttServerClient.withPort(_config.host, - "${_config.clientId}_${DateTime.now().millisecondsSinceEpoch}", _config.port); + _client = MqttServerClient.withPort( + _config.host, + "${_config.clientId}_${DateTime.now().millisecondsSinceEpoch}", + _config.port); _client?.autoReconnect = true; - await _client?.connect(_config.username, _config.password); + var state = await _client?.connect(_config.username, _config.password); + if (state!.state != MqttConnectionState.connected) { + //连接失败 + print("mqtt连接失败: 地址:${_config.host} 原因:$state"); + return false; + } + print("mqtt服务器: ${_config.host} 连接成功"); + _client?.updates.listen((List> message) { final recMess = message[0].payload as MqttPublishMessage; final payload = Utf8Decoder().convert(recMess.payload.message!); // final payload = - // MqttUtilities.bytesToStringAsString(recMess.payload.message!); + // MqttUtilities.bytesToStringAsString(recMess.payload.message!); _config.messgae(message[0].topic!, payload); }); _config.topic?.forEach((topic) { diff --git a/lib/base/redis/redis.dart b/lib/base/redis/redis.dart index 689e5b3..af595e7 100644 --- a/lib/base/redis/redis.dart +++ b/lib/base/redis/redis.dart @@ -16,8 +16,41 @@ class Redis { _redis = redis; } - Future connect() async { - _command = await RedisConnection().connect(_config.host, _config.port); + Future connect({reconnect = false}) async { + if (reconnect) { + print("尝试重连Redis"); + } + Future.delayed(Duration(seconds: 1), () async { + try { + _command = await RedisConnection().connect(_config.host, _config.port); + print('Redis Connected successfully!'); + //定时检测是否断开连接 + Future.delayed(Duration(seconds: 1), () async { + do { + await Future.delayed(Duration(seconds: 5)); + try { + var r = await _command!.send_object(["PING"]); + if (r != "PONG") { + break; + } + } catch (e) { + //发送数据失败 + try { + print('Redis Connection check failed: $e'); + _command!.get_connection().close(); + } catch (o) { + // + } + break; + } + } while (true); + connect(reconnect: true); + }); + } catch (e) { + print('Redis Connection error: $e'); + connect(reconnect: reconnect); + } + }); } Future set(String key, String value) async {