修改日志接口 新增redis

This commit is contained in:
qmqz
2025-01-07 16:50:57 +08:00
parent fd62ed3d98
commit b2fe531919
6 changed files with 161 additions and 45 deletions

View File

@@ -8,6 +8,7 @@ import 'package:EasyDartModule/base/http/TraceDio.dart';
import 'package:EasyDartModule/base/logger/Logger.dart'; import 'package:EasyDartModule/base/logger/Logger.dart';
import 'package:EasyDartModule/base/logger/impl/LokiLogger.dart'; import 'package:EasyDartModule/base/logger/impl/LokiLogger.dart';
import 'package:EasyDartModule/base/mqtt/mqtt.dart'; import 'package:EasyDartModule/base/mqtt/mqtt.dart';
import 'package:EasyDartModule/base/redis/redis.dart';
import 'package:EasyDartModule/base/storage/Storage.dart'; import 'package:EasyDartModule/base/storage/Storage.dart';
import 'package:EasyDartModule/base/storage/impl/MinIoStorage.dart'; import 'package:EasyDartModule/base/storage/impl/MinIoStorage.dart';
import 'package:EasyDartModule/base/webserver/WebServer.dart'; import 'package:EasyDartModule/base/webserver/WebServer.dart';
@@ -24,12 +25,15 @@ class EasyDartModule {
static TraceDio get dio => TraceDio.getInstance(); static TraceDio get dio => TraceDio.getInstance();
static Mqtt get mqtt => Mqtt.getInstance(); static Mqtt get mqtt => Mqtt.getInstance();
static Storage get storage => Storage.getInstance(); static Storage get storage => Storage.getInstance();
static Redis get redis => Redis.getInstance();
static bool init( static bool init(
{DiscoveryConfig? discoveryConfig, {DiscoveryConfig? discoveryConfig,
DataBaseConfig? dataBaseConfig, DataBaseConfig? dataBaseConfig,
LoggerConfig? loggerConfig, LoggerConfig? loggerConfig,
MqttConfig? mqttConfig, MqttConfig? mqttConfig,
StorageConfig? storageConfig}) { StorageConfig? storageConfig,
RedisConfig? redisConfig}) {
if (discoveryConfig != null) { if (discoveryConfig != null) {
//nacos注册配置中心 //nacos注册配置中心
Discovery.setInstance(NacosDiscovery(discoveryConfig)); Discovery.setInstance(NacosDiscovery(discoveryConfig));
@@ -41,11 +45,16 @@ class EasyDartModule {
if (mqttConfig != null) { if (mqttConfig != null) {
//mqtt //mqtt
Mqtt.setInstance(Mqtt(mqttConfig)); Mqtt.setInstance(Mqtt(mqttConfig));
mqtt.connect();
} }
if (storageConfig != null) { if (storageConfig != null) {
//s3存储 //s3存储
Storage.setInstance(MinioStorage(storageConfig)); Storage.setInstance(MinioStorage(storageConfig));
} }
if (redisConfig != null) {
Redis.setInstance(Redis(redisConfig));
redis.connect();
}
if (loggerConfig != null) { if (loggerConfig != null) {
//初始化日志 //初始化日志
Logger.setInstance(LokiLogger(loggerConfig)); Logger.setInstance(LokiLogger(loggerConfig));

View File

@@ -1,10 +1,12 @@
import 'package:EasyDartModule/base/logger/Logger.dart'; import 'package:EasyDartModule/base/logger/Logger.dart';
import 'package:dio/dio.dart'; import 'package:dio/dio.dart';
import 'package:shelf/shelf.dart' as sf; import 'package:shelf/shelf.dart' as sf;
import 'package:uuid/uuid.dart';
class TraceDio { class TraceDio {
final Dio _dio; final Dio _dio;
final Logger _logger; final Logger _logger;
final Uuid uuid = Uuid();
static late TraceDio _traceDio; static late TraceDio _traceDio;
@@ -20,36 +22,40 @@ class TraceDio {
_dio.options.preserveHeaderCase = true; _dio.options.preserveHeaderCase = true;
// 设置拦截器,自动添加 traceId 和 spanId并记录日志 // 设置拦截器,自动添加 traceId 和 spanId并记录日志
String traceId = "none";
String spanId = "none";
_dio.interceptors.add(InterceptorsWrapper( _dio.interceptors.add(InterceptorsWrapper(
onRequest: (options, handler) { onRequest: (options, handler) {
// 获取请求中的 traceId如果没有则生成 // 获取请求中的 traceId如果没有则生成
traceId = options.headers['X-Trace-ID']; String? traceId = options.headers['X-Trace-ID'];
spanId = options.headers['X-Span-ID']; String? spanId = options.headers['X-Span-ID'];
if (traceId == null) {
// 在请求头中添加 traceId 和 spanId // 在请求头中添加 traceId 和 spanId
// options.headers['X-Trace-ID'] = traceId; traceId = uuid.v4();
// options.headers['X-Span-ID'] = spanId; spanId = uuid.v4();
options.headers['X-Trace-ID'] = traceId;
options.headers['X-Span-ID'] = spanId;
}
// 记录请求日志 // 记录请求日志
_logger.info( _logger.info('Sending request: ${options.method} ${options.uri}',
'traceId=$traceId, spanId=$spanId Sending request: ${options.method} ${options.uri}', tag: "DIO", traceId: traceId, spanId: spanId);
tag: "DIO");
return handler.next(options); // 继续请求 return handler.next(options); // 继续请求
}, },
onResponse: (response, handler) { onResponse: (response, handler) {
// 记录响应日志 // 记录响应日志
String? traceId = response.headers.value('X-Trace-ID');
String? spanId = response.headers.value('X-Span-ID');
_logger.info( _logger.info(
'traceId=$traceId, spanId=$spanId Response received: ${response.statusCode} ${response.statusMessage}', 'Response received: ${response.statusCode} ${response.statusMessage}',
tag: "DIO"); tag: "DIO",
traceId: traceId,
spanId: spanId);
return handler.next(response); // 继续处理响应 return handler.next(response); // 继续处理响应
}, },
onError: (DioException e, handler) { onError: (DioException e, handler) {
// 记录错误日志 // 记录错误日志
_logger.error( String? traceId = e.response?.headers.value('X-Trace-ID');
'traceId=$traceId, spanId=$spanId Request failed: ${e.message}', String? spanId = e.response?.headers.value('X-Span-ID');
tag: "DIO"); _logger.error('Request failed: ${e.message}',
tag: "DIO", traceId: traceId, spanId: spanId);
return handler.next(e); // 继续处理错误 return handler.next(e); // 继续处理错误
}, },
)); ));

View File

@@ -9,9 +9,14 @@ abstract class Logger {
_logger = logger; _logger = logger;
} }
void info(String msg, {String tag}); void debug(String msg,
void warning(String msg, {String tag}); {String? tag, String? traceId, String? spanId, String? parentSpanId});
void error(String msg, {String tag}); void info(String msg,
{String? tag, String? traceId, String? spanId, String? parentSpanId});
void warning(String msg,
{String? tag, String? traceId, String? spanId, String? parentSpanId});
void error(String msg,
{String? tag, String? traceId, String? spanId, String? parentSpanId});
} }
class LoggerConfig { class LoggerConfig {

View File

@@ -27,29 +27,64 @@ class LokiLogger implements Logger {
"Content-Type": "application/json", "Content-Type": "application/json",
"Content-Encoding": "gzip" "Content-Encoding": "gzip"
})); }));
@override @override
void info(String msg, {String? tag}) { void debug(String msg,
log(msg, level: LoggerLevel.info, tag: tag); {String? tag, String? traceId, String? spanId, String? parentSpanId}) {
log(msg,
level: LoggerLevel.debug,
tag: tag,
traceId: traceId,
spanId: spanId,
parentSpanId: parentSpanId);
} }
@override @override
void warning(String msg, {String? tag}) { void info(String msg,
log(msg, level: LoggerLevel.warning, tag: tag); {String? tag, String? traceId, String? spanId, String? parentSpanId}) {
log(msg,
level: LoggerLevel.info,
tag: tag,
traceId: traceId,
spanId: spanId,
parentSpanId: parentSpanId);
} }
@override @override
void error(String msg, {String? tag}) { void warning(String msg,
log(msg, level: LoggerLevel.error, tag: tag); {String? tag, String? traceId, String? spanId, String? parentSpanId}) {
log(msg,
level: LoggerLevel.warning,
tag: tag,
traceId: traceId,
spanId: spanId,
parentSpanId: parentSpanId);
} }
void log(String msg, {required LoggerLevel level, String? tag}) { @override
void error(String msg,
{String? tag, String? traceId, String? spanId, String? parentSpanId}) {
log(msg,
level: LoggerLevel.error,
tag: tag,
traceId: traceId,
spanId: spanId,
parentSpanId: parentSpanId);
}
void log(String msg,
{required LoggerLevel level,
String? tag,
String? traceId,
String? spanId,
String? parentSpanId}) {
if (level.level < this.level.level) { if (level.level < this.level.level) {
//日志等级小于设置的输出日志等级 //日志等级小于设置的输出日志等级
return; return;
} }
String log =
"traceId=$traceId, spanId=$spanId parentSpanId=$parentSpanId tag=$tag ${level.name.toUpperCase()} $msg";
if (_config == null) { if (_config == null) {
print("$tag $level $msg"); print(log);
} else { } else {
//推送到loki服务器 //推送到loki服务器
//{_config.url} //{_config.url}
@@ -59,14 +94,22 @@ class LokiLogger implements Logger {
var zip = gzip.encode(utf8.encode(jsonEncode({ var zip = gzip.encode(utf8.encode(jsonEncode({
"streams": [ "streams": [
{ {
"stream": {"service_name": _config.serviceName}, "stream": {
"service_name": _config.serviceName,
// "traceId": traceId,
// "spanId": spanId,
// "parentSpanId": parentSpanId
},
"values": [ "values": [
[nanoseconds.toString(), "$tag ${level.name.toUpperCase()} $msg"] [nanoseconds.toString(), log]
] ]
} }
] ]
}))); })));
dio.post("/loki/api/v1/push", data: zip); dio.post("/loki/api/v1/push", data: zip);
if (level == LoggerLevel.debug) {
print(log);
}
} }
} }
} }

40
lib/base/redis/redis.dart Normal file
View File

@@ -0,0 +1,40 @@
import 'package:redis/redis.dart';
class Redis {
final RedisConfig _config;
Command? _command;
Redis(this._config);
static late Redis _redis;
static Redis getInstance() {
return _redis;
}
static void setInstance(Redis redis) {
_redis = redis;
}
Future<void> connect() async {
_command = await RedisConnection().connect(_config.host, _config.port);
}
Future<bool> set(String key, String value) async {
var response = await _command?.send_object(["SET", key, value]);
return response == "OK";
}
Future<String?> get(String key) async {
return await _command?.send_object(["GET", key]);
}
}
class RedisConfig {
String host;
int port;
RedisConfig(
{required this.host,
this.port = 6379,});
}

View File

@@ -28,9 +28,11 @@ class ShelfWebServer implements WebServer {
final requestSpanId = request.context['request_span_id'] as String; final requestSpanId = request.context['request_span_id'] as String;
final requestParentSpanId = final requestParentSpanId =
request.context['request_parent_span_id'] as String?; request.context['request_parent_span_id'] as String?;
final String logPer = logger.info('| 请求路径: ${request.requestedUri}',
"traceId=$requestId spanId=$requestSpanId parentSpanId=$requestParentSpanId"; tag: tag,
logger.info('$logPer | 请求路径: ${request.requestedUri}', tag: tag); traceId: requestId,
spanId: requestSpanId,
parentSpanId: requestParentSpanId);
// final stopwatch = Stopwatch()..start(); // final stopwatch = Stopwatch()..start();
final stopwatch = request.context["request_stop_watch"] as Stopwatch; final stopwatch = request.context["request_stop_watch"] as Stopwatch;
stopwatch.start(); stopwatch.start();
@@ -39,18 +41,25 @@ class ShelfWebServer implements WebServer {
response = await innerHandler(request); response = await innerHandler(request);
stopwatch.stop(); stopwatch.stop();
logger.info( logger.info(
'$logPer | 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms', '| 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms',
tag: tag); tag: tag,
traceId: requestId,
spanId: requestSpanId,
parentSpanId: requestParentSpanId);
} catch (e, s) { } catch (e, s) {
if (e is HijackException) { if (e is HijackException) {
//不能处理该异常直接抛出 //不能处理该异常直接抛出
throw e; throw e;
} }
stopwatch.stop(); stopwatch.stop();
logger.error("$logPer | 服务器错误 | ${e.toString()} ${s.toString()}", logger.error("| 服务器错误 | ${e.toString()} ${s.toString()}",
tag: tag); tag: tag,
traceId: requestId,
spanId: requestSpanId,
parentSpanId: requestParentSpanId);
response = Response(500, response = Response(500,
body: "Internal Server Error\r\nTraceId:$requestId"); body:
"Internal Server Error\r\nTraceId:$requestId\r\nSpanId:$requestSpanId");
} }
return response; return response;
}; };
@@ -92,7 +101,7 @@ class ShelfWebServer implements WebServer {
_wsCall[path]!.open(spanId); _wsCall[path]!.open(spanId);
// 监听 WebSocket 消息 // 监听 WebSocket 消息
channel.stream.listen((message) { channel.stream.listen((message) {
print('Received message: $message'); // print('Received message: $message');
// 处理消息并回应客户端 // 处理消息并回应客户端
// channel.sink.add(message); // channel.sink.add(message);
_wsCall[path]!.message(spanId, message); _wsCall[path]!.message(spanId, message);
@@ -107,12 +116,16 @@ class ShelfWebServer implements WebServer {
final stopwatch = request.context["request_stop_watch"] as Stopwatch; final stopwatch = request.context["request_stop_watch"] as Stopwatch;
stopwatch.stop(); stopwatch.stop();
logger.info( logger.info(
'traceId=$requestId spanId=$spanId parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms', 'parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms',
tag: "webserver"); tag: "webserver",
traceId: requestId,
spanId: spanId);
}, onError: (error) { }, onError: (error) {
logger.info( logger.info(
'traceId=$requestId spanId=$spanId parentSpanId=$requestParentSpanId | WebSocket error: $error', 'parentSpanId=$requestParentSpanId | WebSocket error: $error',
tag: "webserver"); tag: "webserver",
traceId: requestId,
spanId: spanId);
}); });
})(request); })(request);
} else { } else {