加入重连机制忽略日志记录错误

This commit is contained in:
2025-03-05 14:10:03 +08:00
parent a2155976f3
commit a15345adeb
5 changed files with 96 additions and 26 deletions

View File

@@ -1,3 +1,5 @@
import 'dart:io';
import 'package:EasyDartModule/base/database/DataBase.dart'; import 'package:EasyDartModule/base/database/DataBase.dart';
import 'package:mongo_dart/mongo_dart.dart'; import 'package:mongo_dart/mongo_dart.dart';
@@ -7,16 +9,26 @@ class MongoDb implements DataBase {
MongoDb(this.config) MongoDb(this.config)
: db = Db( : db = Db(
"mongodb://${config.userName}:${config.password}@${config.host}/${config.dataBase}?authSource=admin") { "mongodb://${config.userName}:${config.password}@${config.host}/${config.dataBase}?authSource=admin") {
connect(false);
}
void connect(reconnect) {
if (reconnect) {
print("尝试重连MongoDb");
}
Future.delayed(Duration(seconds: 1), () async { Future.delayed(Duration(seconds: 1), () async {
do { try {
try { await db.open();
await db.open(); print('MongoDb Connected successfully!');
print('Connected successfully!'); //定时检测数据库是否断开
} catch (e) { do {
print('Connection error: $e'); await Future.delayed(Duration(seconds: 5));
await Future.delayed(Duration(seconds: 1)); } while (db.isConnected);
} connect(true);
} while (!db.isConnected); } catch (e) {
print('MongoDb Connection error: $e');
connect(reconnect);
}
}); });
} }

View File

@@ -36,7 +36,7 @@ class TraceDio {
options.headers['X-Span-ID'] = spanId; options.headers['X-Span-ID'] = spanId;
} }
// 记录请求日志 // 记录请求日志
_logger?.info('Sending request: ${options.method} ${options.uri}', _logger?.info('发送请求: ${options.method} ${options.uri}',
tag: "DIO", traceId: traceId, spanId: spanId); tag: "DIO", traceId: traceId, spanId: spanId);
return handler.next(options); // 继续请求 return handler.next(options); // 继续请求
}, },
@@ -44,11 +44,9 @@ class TraceDio {
// 记录响应日志 // 记录响应日志
String? traceId = response.headers.value('X-Trace-ID'); String? traceId = response.headers.value('X-Trace-ID');
String? spanId = response.headers.value('X-Span-ID'); String? spanId = response.headers.value('X-Span-ID');
_logger?.info( _logger?.info('接收数据: ${response.statusCode} ${response.statusMessage}',
'Response received: ${response.statusCode} ${response.statusMessage}', tag: "DIO", traceId: traceId, spanId: spanId);
tag: "DIO", //自动更新token
traceId: traceId,
spanId: spanId);
if (response.headers["token"] != null) { if (response.headers["token"] != null) {
token = response.headers["token"]?[0]; token = response.headers["token"]?[0];
} }
@@ -58,8 +56,18 @@ class TraceDio {
// 记录错误日志 // 记录错误日志
String? traceId = e.requestOptions.headers['X-Trace-ID']; String? traceId = e.requestOptions.headers['X-Trace-ID'];
String? spanId = e.requestOptions.headers['X-Span-ID']; String? spanId = e.requestOptions.headers['X-Span-ID'];
_logger?.error('Request failed: ${e.message}', String errMsg;
tag: "DIO", traceId: traceId, spanId: spanId); if (e.response != null) {
errMsg =
"请求错误: url:${e.requestOptions.path} 请求方法:${e.requestOptions.method} 请求数据:${e.requestOptions.data} 响应代码:${e.response?.statusCode} 响应消息:${e.response?.statusMessage} 响应内容:${e.response?.data}";
} else {
errMsg =
"请求错误: url:${e.requestOptions.path} 请求方法:${e.requestOptions.method} 请求数据:${e.requestOptions.data} 错误消息:${e.error}";
}
print(errMsg);
_logger?.error(errMsg, tag: "DIO", traceId: traceId, spanId: spanId);
return handler.next(e); // 继续处理错误 return handler.next(e); // 继续处理错误
}, },
)); ));
@@ -142,8 +150,7 @@ class TraceDio {
data: data, data: data,
queryParameters: queryParameters, queryParameters: queryParameters,
options: Options( options: Options(
receiveTimeout: receiveTimeout, receiveTimeout: receiveTimeout, headers: getHeader(request: request)),
headers: getHeader(request: request)),
); );
} }
} }

View File

@@ -25,7 +25,14 @@ class LokiLogger implements Logger {
BaseOptions(baseUrl: _config == null ? "" : _config.host, headers: { BaseOptions(baseUrl: _config == null ? "" : _config.host, headers: {
"Content-Type": "application/json", "Content-Type": "application/json",
if (!identical(0, 0.0)) "Content-Encoding": "gzip" if (!identical(0, 0.0)) "Content-Encoding": "gzip"
})); })) {
dio.interceptors.add(InterceptorsWrapper(
onError: (error, handler) => {
//忽略异常
print("logerr:###$error###")
},
));
}
@override @override
void debug(String msg, void debug(String msg,
{String? tag, String? traceId, String? spanId, String? parentSpanId}) { {String? tag, String? traceId, String? spanId, String? parentSpanId}) {
@@ -106,12 +113,14 @@ class LokiLogger implements Logger {
] ]
}); });
//判断平台 //判断平台
if (identical(0, 0.0)) { if (identical(0, 0.0)) {
dio.post("/loki/api/v1/push", data: data); dio.post("/loki/api/v1/push", data: data);
} else { } else {
var zip = gzip.encode(utf8.encode(data)); var zip = gzip.encode(utf8.encode(data));
dio.post("/loki/api/v1/push", data: zip); dio.post("/loki/api/v1/push", data: zip);
} }
if (level == LoggerLevel.debug || _config.print) { if (level == LoggerLevel.debug || _config.print) {
print(log); print(log);
} }

View File

@@ -23,16 +23,25 @@ class Mqtt {
if (_client != null) { if (_client != null) {
return true; return true;
} }
_client = MqttServerClient.withPort(_config.host, _client = MqttServerClient.withPort(
"${_config.clientId}_${DateTime.now().millisecondsSinceEpoch}", _config.port); _config.host,
"${_config.clientId}_${DateTime.now().millisecondsSinceEpoch}",
_config.port);
_client?.autoReconnect = true; _client?.autoReconnect = true;
await _client?.connect(_config.username, _config.password); var state = await _client?.connect(_config.username, _config.password);
if (state!.state != MqttConnectionState.connected) {
//连接失败
print("mqtt连接失败: 地址:${_config.host} 原因:$state");
return false;
}
print("mqtt服务器: ${_config.host} 连接成功");
_client?.updates.listen((List<MqttReceivedMessage<MqttMessage>> message) { _client?.updates.listen((List<MqttReceivedMessage<MqttMessage>> message) {
final recMess = message[0].payload as MqttPublishMessage; final recMess = message[0].payload as MqttPublishMessage;
final payload = Utf8Decoder().convert(recMess.payload.message!); final payload = Utf8Decoder().convert(recMess.payload.message!);
// final payload = // final payload =
// MqttUtilities.bytesToStringAsString(recMess.payload.message!); // MqttUtilities.bytesToStringAsString(recMess.payload.message!);
_config.messgae(message[0].topic!, payload); _config.messgae(message[0].topic!, payload);
}); });
_config.topic?.forEach((topic) { _config.topic?.forEach((topic) {

View File

@@ -16,8 +16,41 @@ class Redis {
_redis = redis; _redis = redis;
} }
Future<void> connect() async { Future<void> connect({reconnect = false}) async {
_command = await RedisConnection().connect(_config.host, _config.port); if (reconnect) {
print("尝试重连Redis");
}
Future.delayed(Duration(seconds: 1), () async {
try {
_command = await RedisConnection().connect(_config.host, _config.port);
print('Redis Connected successfully!');
//定时检测是否断开连接
Future.delayed(Duration(seconds: 1), () async {
do {
await Future.delayed(Duration(seconds: 5));
try {
var r = await _command!.send_object(["PING"]);
if (r != "PONG") {
break;
}
} catch (e) {
//发送数据失败
try {
print('Redis Connection check failed: $e');
_command!.get_connection().close();
} catch (o) {
//
}
break;
}
} while (true);
connect(reconnect: true);
});
} catch (e) {
print('Redis Connection error: $e');
connect(reconnect: reconnect);
}
});
} }
Future<bool> set(String key, String value) async { Future<bool> set(String key, String value) async {