import 'dart:io'; import 'dart:mirrors'; import 'package:EasyDartModule/EasyDartModule.dart'; import 'package:EasyDartModule/base/logger/Logger.dart'; import 'package:EasyDartModule/base/webserver/WebServer.dart'; import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart'; import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart'; import 'package:shelf_router/shelf_router.dart'; import 'package:shelf_web_socket/shelf_web_socket.dart'; import 'package:uuid/uuid.dart'; class ShelfWebServer implements WebServer { late Logger logger; HttpServer? _server; final Router _router = Router(); final Uuid uuid = Uuid(); final Map _wsCall = {}; ShelfWebServer(this.logger); final String tag = "webserver"; // 中间件:记录请求日志 Middleware logRequests() { return (Handler innerHandler) { return (Request request) async { final requestId = request.context['request_trace_id'] as String; final requestSpanId = request.context['request_span_id'] as String; final requestParentSpanId = request.context['request_parent_span_id'] as String?; logger.info('| 请求路径: ${request.requestedUri}', tag: tag, traceId: requestId, spanId: requestSpanId, parentSpanId: requestParentSpanId); // final stopwatch = Stopwatch()..start(); final stopwatch = request.context["request_stop_watch"] as Stopwatch; stopwatch.start(); Response response; try { response = await innerHandler(request); stopwatch.stop(); logger.info( '| 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms', tag: tag, traceId: requestId, spanId: requestSpanId, parentSpanId: requestParentSpanId); } catch (e, s) { if (e is HijackException) { //不能处理该异常直接抛出 throw e; } stopwatch.stop(); logger.error("| 服务器错误 | ${e.toString()} ${s.toString()}", tag: tag, traceId: requestId, spanId: requestSpanId, parentSpanId: requestParentSpanId); response = Response(500, body: "Internal Server Error\r\nTraceId:$requestId\r\nSpanId:$requestSpanId"); } return response; }; }; } // 中间件:为每个请求生成唯一的请求ID Middleware requestIdMiddleware() { return (Handler innerHandler) { return (Request request) async { // 获取请求头中的 X-Request-ID,如果没有则生成一个 final requestId = request.headers['X-Trace-ID'] ?? uuid.v4(); String? spanId = request.headers['X-Span-ID']; String? parentSpanId; if (spanId != null) { parentSpanId = spanId; } spanId = uuid.v4(); final updatedRequest = request.change(context: { 'request_trace_id': requestId, "request_span_id": spanId, "request_parent_span_id": parentSpanId, "request_stop_watch": Stopwatch() }); return innerHandler(updatedRequest); }; }; } Future _routerHandler(Request request) async { String path = request.url.path; if (_wsCall.containsKey(path)) { final String spanId = request.context['request_span_id'] as String; final String requestId = request.context['request_trace_id'] as String; final String? requestParentSpanId = request.context['request_parent_span_id'] as String?; return await webSocketHandler((channel) { _wsCall[path]!.chanelMap[spanId] = channel; _wsCall[path]!.open(spanId); // 监听 WebSocket 消息 channel.stream.listen((message) { // print('Received message: $message'); // 处理消息并回应客户端 // channel.sink.add(message); _wsCall[path]!.message(spanId, message); }, onDone: () { try { _wsCall[path]!.close(spanId); _wsCall[path]!.chanelMap.remove(spanId); } catch (e) { print(e); } final stopwatch = request.context["request_stop_watch"] as Stopwatch; stopwatch.stop(); logger.info( 'parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms', tag: "webserver", traceId: requestId, spanId: spanId); }, onError: (error) { logger.info( 'parentSpanId=$requestParentSpanId | WebSocket error: $error', tag: "webserver", traceId: requestId, spanId: spanId); }); })(request); } else { return await _router.call(request); } } @override void start(int port) async { //反射获取全部路由地址 final handler = Pipeline() .addMiddleware(requestIdMiddleware()) .addMiddleware(logRequests()) .addHandler(_routerHandler); _server = await serve(handler, InternetAddress.anyIPv4, port); print('Server listening on port ${_server?.port}'); } @override void stop() { _server?.close(); } @override void addHandler(handler) { ClassMirror cm = reflectClass(handler.runtimeType); var im = reflect(handler); String path = ""; for (var metadata in cm.metadata) { // 检查元数据是否为RequestMapping类型 if (metadata.reflectee is RequestMapping) { // 获取实例 RequestMapping annotation = metadata.reflectee; path = annotation.path; if (path.endsWith("/")) { path = path.substring(0, path.length - 1); } if (annotation.method == HttpMethod.WS) { //wwebsocket处理句柄直接加入路由 addRouter(annotation.method, path, handler); return; } break; } } im.type.declarations.forEach((k, v) { if (v is MethodMirror) { for (var m in v.metadata) { if (m.reflectee is RequestMapping) { var mp = m.reflectee.path as String; if (mp.startsWith("/")) { mp = mp.substring(1, mp.length); } String p = "$path/$mp"; // print("method: ${m.reflectee.method} $p"); //把地址加入路由 addRouter(m.reflectee.method, p, (req, [a, b, c, d, e, f]) async { //从request中获取token并解码传入 String? token = req.headers["token"]; dynamic payload; if (token != null) { try { payload = JWT.decode(token).payload; } catch (e) { //jwt解码失败 print(e); } } List args; if (v.parameters.length == 2) { args = [req, payload ?? {}]; } else { args = [req]; } Response response = await im.invoke(v.simpleName, args).reflectee as Response; //判断contenttype是否定义 var t = response.headers["content-type"]; if (t == null) { //设置默认contenttype response = response.change(headers: { ...response.headers, "content-type": m.reflectee.responseType.type }); } return response; }); } } } }); } void addRouter(HttpMethod method, String path, dynamic handler) { if (method == HttpMethod.WS) { //添加websocket连接处理函数 if (handler is WebSocketHandler) { _wsCall[path] = handler; } else { print("回调函数类型错误 需要 WebSocketHandler "); } } else { _router.add(method.name, path, handler); } } } abstract class WebSocketHandler { Map chanelMap = {}; void open(String id); void close(String id); void message(String id, dynamic message); void sendData(String id, dynamic data) { if (chanelMap.containsKey(id)) { chanelMap[id].sink.add(data); } else { print("id未找到"); } } }