diff --git a/lib/EasyDartModule.dart b/lib/EasyDartModule.dart index 6282c2a..70e1d68 100644 --- a/lib/EasyDartModule.dart +++ b/lib/EasyDartModule.dart @@ -8,6 +8,7 @@ import 'package:EasyDartModule/base/http/TraceDio.dart'; import 'package:EasyDartModule/base/logger/Logger.dart'; import 'package:EasyDartModule/base/logger/impl/LokiLogger.dart'; import 'package:EasyDartModule/base/mqtt/mqtt.dart'; +import 'package:EasyDartModule/base/redis/redis.dart'; import 'package:EasyDartModule/base/storage/Storage.dart'; import 'package:EasyDartModule/base/storage/impl/MinIoStorage.dart'; import 'package:EasyDartModule/base/webserver/WebServer.dart'; @@ -24,12 +25,15 @@ class EasyDartModule { static TraceDio get dio => TraceDio.getInstance(); static Mqtt get mqtt => Mqtt.getInstance(); static Storage get storage => Storage.getInstance(); + static Redis get redis => Redis.getInstance(); + static bool init( {DiscoveryConfig? discoveryConfig, DataBaseConfig? dataBaseConfig, LoggerConfig? loggerConfig, MqttConfig? mqttConfig, - StorageConfig? storageConfig}) { + StorageConfig? storageConfig, + RedisConfig? redisConfig}) { if (discoveryConfig != null) { //nacos注册配置中心 Discovery.setInstance(NacosDiscovery(discoveryConfig)); @@ -41,11 +45,16 @@ class EasyDartModule { if (mqttConfig != null) { //mqtt Mqtt.setInstance(Mqtt(mqttConfig)); + mqtt.connect(); } if (storageConfig != null) { //s3存储 Storage.setInstance(MinioStorage(storageConfig)); } + if (redisConfig != null) { + Redis.setInstance(Redis(redisConfig)); + redis.connect(); + } if (loggerConfig != null) { //初始化日志 Logger.setInstance(LokiLogger(loggerConfig)); diff --git a/lib/base/http/TraceDio.dart b/lib/base/http/TraceDio.dart index 1bfe12a..c3ecab1 100644 --- a/lib/base/http/TraceDio.dart +++ b/lib/base/http/TraceDio.dart @@ -1,10 +1,12 @@ import 'package:EasyDartModule/base/logger/Logger.dart'; import 'package:dio/dio.dart'; import 'package:shelf/shelf.dart' as sf; +import 'package:uuid/uuid.dart'; class TraceDio { final Dio _dio; final Logger _logger; + final Uuid uuid = Uuid(); static late TraceDio _traceDio; @@ -20,36 +22,40 @@ class TraceDio { _dio.options.preserveHeaderCase = true; // 设置拦截器,自动添加 traceId 和 spanId,并记录日志 - String traceId = "none"; - String spanId = "none"; _dio.interceptors.add(InterceptorsWrapper( onRequest: (options, handler) { // 获取请求中的 traceId(如果没有则生成) - traceId = options.headers['X-Trace-ID']; - spanId = options.headers['X-Span-ID']; - - // 在请求头中添加 traceId 和 spanId - // options.headers['X-Trace-ID'] = traceId; - // options.headers['X-Span-ID'] = spanId; - + String? traceId = options.headers['X-Trace-ID']; + String? spanId = options.headers['X-Span-ID']; + if (traceId == null) { + // 在请求头中添加 traceId 和 spanId + traceId = uuid.v4(); + spanId = uuid.v4(); + options.headers['X-Trace-ID'] = traceId; + options.headers['X-Span-ID'] = spanId; + } // 记录请求日志 - _logger.info( - 'traceId=$traceId, spanId=$spanId Sending request: ${options.method} ${options.uri}', - tag: "DIO"); + _logger.info('Sending request: ${options.method} ${options.uri}', + tag: "DIO", traceId: traceId, spanId: spanId); return handler.next(options); // 继续请求 }, onResponse: (response, handler) { // 记录响应日志 + String? traceId = response.headers.value('X-Trace-ID'); + String? spanId = response.headers.value('X-Span-ID'); _logger.info( - 'traceId=$traceId, spanId=$spanId Response received: ${response.statusCode} ${response.statusMessage}', - tag: "DIO"); + 'Response received: ${response.statusCode} ${response.statusMessage}', + tag: "DIO", + traceId: traceId, + spanId: spanId); return handler.next(response); // 继续处理响应 }, onError: (DioException e, handler) { // 记录错误日志 - _logger.error( - 'traceId=$traceId, spanId=$spanId Request failed: ${e.message}', - tag: "DIO"); + String? traceId = e.response?.headers.value('X-Trace-ID'); + String? spanId = e.response?.headers.value('X-Span-ID'); + _logger.error('Request failed: ${e.message}', + tag: "DIO", traceId: traceId, spanId: spanId); return handler.next(e); // 继续处理错误 }, )); diff --git a/lib/base/logger/Logger.dart b/lib/base/logger/Logger.dart index 614feaa..3344f92 100644 --- a/lib/base/logger/Logger.dart +++ b/lib/base/logger/Logger.dart @@ -9,9 +9,14 @@ abstract class Logger { _logger = logger; } - void info(String msg, {String tag}); - void warning(String msg, {String tag}); - void error(String msg, {String tag}); + void debug(String msg, + {String? tag, String? traceId, String? spanId, String? parentSpanId}); + void info(String msg, + {String? tag, String? traceId, String? spanId, String? parentSpanId}); + void warning(String msg, + {String? tag, String? traceId, String? spanId, String? parentSpanId}); + void error(String msg, + {String? tag, String? traceId, String? spanId, String? parentSpanId}); } class LoggerConfig { diff --git a/lib/base/logger/impl/LokiLogger.dart b/lib/base/logger/impl/LokiLogger.dart index afe8453..d063216 100644 --- a/lib/base/logger/impl/LokiLogger.dart +++ b/lib/base/logger/impl/LokiLogger.dart @@ -27,29 +27,64 @@ class LokiLogger implements Logger { "Content-Type": "application/json", "Content-Encoding": "gzip" })); - @override - void info(String msg, {String? tag}) { - log(msg, level: LoggerLevel.info, tag: tag); + void debug(String msg, + {String? tag, String? traceId, String? spanId, String? parentSpanId}) { + log(msg, + level: LoggerLevel.debug, + tag: tag, + traceId: traceId, + spanId: spanId, + parentSpanId: parentSpanId); } @override - void warning(String msg, {String? tag}) { - log(msg, level: LoggerLevel.warning, tag: tag); + void info(String msg, + {String? tag, String? traceId, String? spanId, String? parentSpanId}) { + log(msg, + level: LoggerLevel.info, + tag: tag, + traceId: traceId, + spanId: spanId, + parentSpanId: parentSpanId); } @override - void error(String msg, {String? tag}) { - log(msg, level: LoggerLevel.error, tag: tag); + void warning(String msg, + {String? tag, String? traceId, String? spanId, String? parentSpanId}) { + log(msg, + level: LoggerLevel.warning, + tag: tag, + traceId: traceId, + spanId: spanId, + parentSpanId: parentSpanId); } - void log(String msg, {required LoggerLevel level, String? tag}) { + @override + void error(String msg, + {String? tag, String? traceId, String? spanId, String? parentSpanId}) { + log(msg, + level: LoggerLevel.error, + tag: tag, + traceId: traceId, + spanId: spanId, + parentSpanId: parentSpanId); + } + + void log(String msg, + {required LoggerLevel level, + String? tag, + String? traceId, + String? spanId, + String? parentSpanId}) { if (level.level < this.level.level) { //日志等级小于设置的输出日志等级 return; } + String log = + "traceId=$traceId, spanId=$spanId parentSpanId=$parentSpanId tag=$tag ${level.name.toUpperCase()} $msg"; if (_config == null) { - print("$tag $level $msg"); + print(log); } else { //推送到loki服务器 //{_config.url} @@ -59,14 +94,22 @@ class LokiLogger implements Logger { var zip = gzip.encode(utf8.encode(jsonEncode({ "streams": [ { - "stream": {"service_name": _config.serviceName}, + "stream": { + "service_name": _config.serviceName, + // "traceId": traceId, + // "spanId": spanId, + // "parentSpanId": parentSpanId + }, "values": [ - [nanoseconds.toString(), "$tag ${level.name.toUpperCase()} $msg"] + [nanoseconds.toString(), log] ] } ] }))); dio.post("/loki/api/v1/push", data: zip); + if (level == LoggerLevel.debug) { + print(log); + } } } } diff --git a/lib/base/redis/redis.dart b/lib/base/redis/redis.dart new file mode 100644 index 0000000..3fb2b57 --- /dev/null +++ b/lib/base/redis/redis.dart @@ -0,0 +1,40 @@ +import 'package:redis/redis.dart'; + +class Redis { + final RedisConfig _config; + Command? _command; + + Redis(this._config); + + static late Redis _redis; + + static Redis getInstance() { + return _redis; + } + + static void setInstance(Redis redis) { + _redis = redis; + } + + Future connect() async { + _command = await RedisConnection().connect(_config.host, _config.port); + } + + Future set(String key, String value) async { + var response = await _command?.send_object(["SET", key, value]); + return response == "OK"; + } + + Future get(String key) async { + return await _command?.send_object(["GET", key]); + } +} + +class RedisConfig { + String host; + int port; + + RedisConfig( + {required this.host, + this.port = 6379,}); +} diff --git a/lib/base/webserver/impl/ShelfWebServer.dart b/lib/base/webserver/impl/ShelfWebServer.dart index d1a6162..747fe98 100644 --- a/lib/base/webserver/impl/ShelfWebServer.dart +++ b/lib/base/webserver/impl/ShelfWebServer.dart @@ -28,9 +28,11 @@ class ShelfWebServer implements WebServer { final requestSpanId = request.context['request_span_id'] as String; final requestParentSpanId = request.context['request_parent_span_id'] as String?; - final String logPer = - "traceId=$requestId spanId=$requestSpanId parentSpanId=$requestParentSpanId"; - logger.info('$logPer | 请求路径: ${request.requestedUri}', tag: tag); + logger.info('| 请求路径: ${request.requestedUri}', + tag: tag, + traceId: requestId, + spanId: requestSpanId, + parentSpanId: requestParentSpanId); // final stopwatch = Stopwatch()..start(); final stopwatch = request.context["request_stop_watch"] as Stopwatch; stopwatch.start(); @@ -39,18 +41,25 @@ class ShelfWebServer implements WebServer { response = await innerHandler(request); stopwatch.stop(); logger.info( - '$logPer | 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms', - tag: tag); + '| 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms', + tag: tag, + traceId: requestId, + spanId: requestSpanId, + parentSpanId: requestParentSpanId); } catch (e, s) { if (e is HijackException) { //不能处理该异常直接抛出 throw e; } stopwatch.stop(); - logger.error("$logPer | 服务器错误 | ${e.toString()} ${s.toString()}", - tag: tag); + logger.error("| 服务器错误 | ${e.toString()} ${s.toString()}", + tag: tag, + traceId: requestId, + spanId: requestSpanId, + parentSpanId: requestParentSpanId); response = Response(500, - body: "Internal Server Error\r\nTraceId:$requestId"); + body: + "Internal Server Error\r\nTraceId:$requestId\r\nSpanId:$requestSpanId"); } return response; }; @@ -92,7 +101,7 @@ class ShelfWebServer implements WebServer { _wsCall[path]!.open(spanId); // 监听 WebSocket 消息 channel.stream.listen((message) { - print('Received message: $message'); + // print('Received message: $message'); // 处理消息并回应客户端 // channel.sink.add(message); _wsCall[path]!.message(spanId, message); @@ -107,12 +116,16 @@ class ShelfWebServer implements WebServer { final stopwatch = request.context["request_stop_watch"] as Stopwatch; stopwatch.stop(); logger.info( - 'traceId=$requestId spanId=$spanId parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms', - tag: "webserver"); + 'parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms', + tag: "webserver", + traceId: requestId, + spanId: spanId); }, onError: (error) { logger.info( - 'traceId=$requestId spanId=$spanId parentSpanId=$requestParentSpanId | WebSocket error: $error', - tag: "webserver"); + 'parentSpanId=$requestParentSpanId | WebSocket error: $error', + tag: "webserver", + traceId: requestId, + spanId: spanId); }); })(request); } else {