244 lines
7.8 KiB
Dart
244 lines
7.8 KiB
Dart
import 'dart:io';
|
||
import 'dart:mirrors';
|
||
|
||
import 'package:EasyDartModule/EasyDartModule.dart';
|
||
import 'package:EasyDartModule/base/logger/Logger.dart';
|
||
import 'package:EasyDartModule/base/webserver/WebServer.dart';
|
||
import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart';
|
||
import 'package:shelf/shelf.dart';
|
||
import 'package:shelf/shelf_io.dart';
|
||
import 'package:shelf_router/shelf_router.dart';
|
||
import 'package:shelf_web_socket/shelf_web_socket.dart';
|
||
import 'package:uuid/uuid.dart';
|
||
|
||
class ShelfWebServer implements WebServer {
|
||
late Logger logger;
|
||
HttpServer? _server;
|
||
final Router _router = Router();
|
||
final Uuid uuid = Uuid();
|
||
final Map<String, WebSocketHandler> _wsCall = {};
|
||
|
||
ShelfWebServer(this.logger);
|
||
final String tag = "webserver";
|
||
|
||
// 中间件:记录请求日志
|
||
Middleware logRequests() {
|
||
return (Handler innerHandler) {
|
||
return (Request request) async {
|
||
final requestId = request.context['request_trace_id'] as String;
|
||
final requestSpanId = request.context['request_span_id'] as String;
|
||
final requestParentSpanId =
|
||
request.context['request_parent_span_id'] as String?;
|
||
logger.info('| 请求路径: ${request.requestedUri}',
|
||
tag: tag,
|
||
traceId: requestId,
|
||
spanId: requestSpanId,
|
||
parentSpanId: requestParentSpanId);
|
||
// final stopwatch = Stopwatch()..start();
|
||
final stopwatch = request.context["request_stop_watch"] as Stopwatch;
|
||
stopwatch.start();
|
||
Response response;
|
||
try {
|
||
response = await innerHandler(request);
|
||
stopwatch.stop();
|
||
logger.info(
|
||
'| 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms',
|
||
tag: tag,
|
||
traceId: requestId,
|
||
spanId: requestSpanId,
|
||
parentSpanId: requestParentSpanId);
|
||
} catch (e, s) {
|
||
if (e is HijackException) {
|
||
//不能处理该异常直接抛出
|
||
throw e;
|
||
}
|
||
stopwatch.stop();
|
||
logger.error("| 服务器错误 | ${e.toString()} ${s.toString()}",
|
||
tag: tag,
|
||
traceId: requestId,
|
||
spanId: requestSpanId,
|
||
parentSpanId: requestParentSpanId);
|
||
response = Response(500,
|
||
body:
|
||
"Internal Server Error\r\nTraceId:$requestId\r\nSpanId:$requestSpanId");
|
||
}
|
||
return response;
|
||
};
|
||
};
|
||
}
|
||
|
||
// 中间件:为每个请求生成唯一的请求ID
|
||
Middleware requestIdMiddleware() {
|
||
return (Handler innerHandler) {
|
||
return (Request request) async {
|
||
// 获取请求头中的 X-Request-ID,如果没有则生成一个
|
||
final requestId = request.headers['X-Trace-ID'] ?? uuid.v4();
|
||
String? spanId = request.headers['X-Span-ID'];
|
||
String? parentSpanId;
|
||
if (spanId != null) {
|
||
parentSpanId = spanId;
|
||
}
|
||
spanId = uuid.v4();
|
||
final updatedRequest = request.change(context: {
|
||
'request_trace_id': requestId,
|
||
"request_span_id": spanId,
|
||
"request_parent_span_id": parentSpanId,
|
||
"request_stop_watch": Stopwatch()
|
||
});
|
||
return innerHandler(updatedRequest);
|
||
};
|
||
};
|
||
}
|
||
|
||
Future<Response> _routerHandler(Request request) async {
|
||
String path = request.url.path;
|
||
if (_wsCall.containsKey(path)) {
|
||
final String spanId = request.context['request_span_id'] as String;
|
||
final String requestId = request.context['request_trace_id'] as String;
|
||
final String? requestParentSpanId =
|
||
request.context['request_parent_span_id'] as String?;
|
||
return await webSocketHandler((channel) {
|
||
_wsCall[path]!.chanelMap[spanId] = channel;
|
||
_wsCall[path]!.open(spanId);
|
||
// 监听 WebSocket 消息
|
||
channel.stream.listen((message) {
|
||
// print('Received message: $message');
|
||
// 处理消息并回应客户端
|
||
// channel.sink.add(message);
|
||
_wsCall[path]!.message(spanId, message);
|
||
}, onDone: () {
|
||
try {
|
||
_wsCall[path]!.close(spanId);
|
||
_wsCall[path]!.chanelMap.remove(spanId);
|
||
} catch (e) {
|
||
print(e);
|
||
}
|
||
|
||
final stopwatch = request.context["request_stop_watch"] as Stopwatch;
|
||
stopwatch.stop();
|
||
logger.info(
|
||
'parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms',
|
||
tag: "webserver",
|
||
traceId: requestId,
|
||
spanId: spanId);
|
||
}, onError: (error) {
|
||
logger.info(
|
||
'parentSpanId=$requestParentSpanId | WebSocket error: $error',
|
||
tag: "webserver",
|
||
traceId: requestId,
|
||
spanId: spanId);
|
||
});
|
||
})(request);
|
||
} else {
|
||
return await _router.call(request);
|
||
}
|
||
}
|
||
|
||
@override
|
||
void start(int port) async {
|
||
//反射获取全部路由地址
|
||
final handler = Pipeline()
|
||
.addMiddleware(requestIdMiddleware())
|
||
.addMiddleware(logRequests())
|
||
.addHandler(_routerHandler);
|
||
_server = await serve(handler, InternetAddress.anyIPv4, port);
|
||
print('Server listening on port ${_server?.port}');
|
||
}
|
||
|
||
@override
|
||
void stop() {
|
||
_server?.close();
|
||
}
|
||
|
||
@override
|
||
void addHandler(handler) {
|
||
ClassMirror cm = reflectClass(handler.runtimeType);
|
||
var im = reflect(handler);
|
||
String path = "";
|
||
for (var metadata in cm.metadata) {
|
||
// 检查元数据是否为RequestMapping类型
|
||
if (metadata.reflectee is RequestMapping) {
|
||
// 获取实例
|
||
RequestMapping annotation = metadata.reflectee;
|
||
path = annotation.path;
|
||
if (path.endsWith("/")) {
|
||
path = path.substring(0, path.length - 1);
|
||
}
|
||
if (annotation.method == HttpMethod.WS) {
|
||
//wwebsocket处理句柄直接加入路由
|
||
addRouter(annotation.method, path, handler);
|
||
return;
|
||
}
|
||
break;
|
||
}
|
||
}
|
||
|
||
im.type.declarations.forEach((k, v) {
|
||
if (v is MethodMirror) {
|
||
for (var m in v.metadata) {
|
||
if (m.reflectee is RequestMapping) {
|
||
var mp = m.reflectee.path as String;
|
||
|
||
if (mp.startsWith("/")) {
|
||
mp = mp.substring(1, mp.length);
|
||
}
|
||
|
||
String p = "$path/$mp";
|
||
// print("method: ${m.reflectee.method} $p");
|
||
//把地址加入路由
|
||
|
||
addRouter(m.reflectee.method, p, (req, [a, b, c, d, e, f]) {
|
||
//从request中获取token并解码传入
|
||
String? token = req.headers["token"];
|
||
dynamic payload;
|
||
if (token != null) {
|
||
try {
|
||
payload = JWT.decode(token).payload;
|
||
} catch (e) {
|
||
//jwt解码失败
|
||
print(e);
|
||
}
|
||
}
|
||
List args;
|
||
if (v.parameters.length == 2) {
|
||
args = [req, payload ?? <String, dynamic>{}];
|
||
} else {
|
||
args = [req];
|
||
}
|
||
return im.invoke(v.simpleName, args).reflectee;
|
||
});
|
||
}
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
void addRouter(HttpMethod method, String path, dynamic handler) {
|
||
if (method == HttpMethod.WS) {
|
||
//添加websocket连接处理函数
|
||
if (handler is WebSocketHandler) {
|
||
_wsCall[path] = handler;
|
||
} else {
|
||
print("回调函数类型错误 需要 WebSocketHandler ");
|
||
}
|
||
} else {
|
||
_router.add(method.name, path, handler);
|
||
}
|
||
}
|
||
}
|
||
|
||
abstract class WebSocketHandler {
|
||
Map<String, dynamic> chanelMap = {};
|
||
void open(String id);
|
||
void close(String id);
|
||
void message(String id, dynamic message);
|
||
|
||
void sendData(String id, dynamic data) {
|
||
if (chanelMap.containsKey(id)) {
|
||
chanelMap[id].sink.add(data);
|
||
} else {
|
||
print("id未找到");
|
||
}
|
||
}
|
||
}
|