import 'dart:convert'; import 'dart:io'; import 'package:EasyDartModule/EasyDartModule.dart'; import 'package:EasyDartModule/base/logger/Logger.dart'; import 'package:EasyDartModule/base/webserver/WebServer.dart'; import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart'; import 'package:shelf/shelf.dart'; import 'package:shelf/shelf_io.dart'; import 'package:shelf_router/shelf_router.dart'; import 'package:shelf_web_socket/shelf_web_socket.dart'; import 'package:uuid/uuid.dart'; class ShelfWebServer implements WebServer { late Logger logger; HttpServer? _server; final Router _router = Router(); final Uuid uuid = Uuid(); final Map _wsCall = {}; Future Function(String?)? tokenCheck; Response? Function(Request, Map)? interceptor; ShelfWebServer(this.logger); final String tag = "webserver"; Stream> encodeStream( String data, { Encoding encoding = utf8, }) async* { yield encoding.encode(data); } Future decodeStream(Stream> stream, {Encoding encoding = utf8}) { return encoding.decodeStream(stream); } // 中间件:记录请求日志 Middleware logRequests() { return (Handler innerHandler) { return (Request request) async { String body = ""; if (request.mimeType == null || request.mimeType != "multipart/form-data") { body = await request.readAsString(); request = request.change(body: body); } final requestId = request.context['request_trace_id'] as String; final requestSpanId = request.context['request_span_id'] as String; final requestParentSpanId = request.context['request_parent_span_id'] as String?; logger.info('| 请求路径: ${request.method} ${request.requestedUri} $body', tag: tag, traceId: requestId, spanId: requestSpanId, parentSpanId: requestParentSpanId); // final stopwatch = Stopwatch()..start(); final stopwatch = request.context["request_stop_watch"] as Stopwatch; stopwatch.start(); Response response; try { response = await innerHandler(request); stopwatch.stop(); logger.info( '| 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms', tag: tag, traceId: requestId, spanId: requestSpanId, parentSpanId: requestParentSpanId); } catch (e, s) { if (e is HijackException) { //不能处理该异常直接抛出 rethrow; } stopwatch.stop(); logger.error("| 服务器错误 | ${e.toString()} ${s.toString()}", tag: tag, traceId: requestId, spanId: requestSpanId, parentSpanId: requestParentSpanId); response = Response(500, body: "Internal Server Error\r\nTraceId:$requestId\r\nSpanId:$requestSpanId"); } return response; }; }; } // 中间件:为每个请求生成唯一的请求ID Middleware requestIdMiddleware() { return (Handler innerHandler) { return (Request request) async { // 获取请求头中的 X-Request-ID,如果没有则生成一个 final requestId = request.headers['X-Trace-ID'] ?? uuid.v4(); String? spanId = request.headers['X-Span-ID']; String? parentSpanId; if (spanId != null) { parentSpanId = spanId; } spanId = uuid.v4(); final updatedRequest = request.change(context: { 'request_trace_id': requestId, "request_span_id": spanId, "request_parent_span_id": parentSpanId, "request_stop_watch": Stopwatch() }); return innerHandler(updatedRequest); }; }; } Future _routerHandler(Request request) async { String path = request.url.path; if (_wsCall.containsKey(path)) { final String spanId = request.context['request_span_id'] as String; final String requestId = request.context['request_trace_id'] as String; final String? requestParentSpanId = request.context['request_parent_span_id'] as String?; return await webSocketHandler((channel) { _wsCall[path]!.chanelMap[spanId] = channel; _wsCall[path]!.open(spanId); // 监听 WebSocket 消息 channel.stream.listen((message) { // print('Received message: $message'); // 处理消息并回应客户端 // channel.sink.add(message); _wsCall[path]!.message(spanId, message); }, onDone: () { try { _wsCall[path]!.close(spanId); _wsCall[path]!.chanelMap.remove(spanId); } catch (e) { print(e); } final stopwatch = request.context["request_stop_watch"] as Stopwatch; stopwatch.stop(); logger.info( 'parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms', tag: "webserver", traceId: requestId, spanId: spanId); }, onError: (error) { logger.info( 'parentSpanId=$requestParentSpanId | WebSocket error: $error', tag: "webserver", traceId: requestId, spanId: spanId); }); })(request); } else { return await _router.call(request); } } @override void start(int port, {Response? Function(Request, Map)? interceptor, Future Function(String?)? tokenCheck}) async { var pipeline = Pipeline() //生成请求id .addMiddleware(requestIdMiddleware()) //api日志记录 .addMiddleware(logRequests()); //添加自定义拦截器 if (interceptor != null) { this.interceptor = interceptor; } if (tokenCheck != null) { this.tokenCheck = tokenCheck; } final handler = pipeline.addHandler(_routerHandler); _server = await serve(handler, InternetAddress.anyIPv4, port); print('Server listening on port ${_server?.port}'); } @override void stop() { _server?.close(); } Response setResponseHeader(response, contentType) { Map headers = {...response.headers}; headers['Access-Control-Allow-Origin'] = '*'; headers['Access-Control-Allow-Headers'] = 'Content-Type,X-Span-ID,X-Trace-ID,token'; headers["Access-Control-Expose-Headers"] = "Content-Type,X-Span-ID,X-Trace-ID,token"; headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS'; var uct = headers["user-content-type"]; if (uct != null) { headers.remove("user-content-type"); headers["content-type"] = uct; } else { headers["content-type"] = contentType; } return response.change(headers: headers); } @override void addHandler(handler) { try { handler.callHandler = handler; } catch (e) { print( "请在 ${handler.runtimeType} 中实现set方法 set callHandler(handler) => _callHandler = handler;"); return; } try { handler.routeMap.forEach((k, v) { for (var p in v) { if (k == HttpMethod.WS) { addRouter(k, p[0], handler); continue; } addRouter(k, p[0], (Request req, [a, b, c, d, e, f]) async { //从request中获取token并解码传入 bool auth = true; if (p.length >= 5) { //判断是否需要进行token校验 auth = p[4]; } dynamic payload; var response; if (auth) { String? token = req.headers["token"]; if (token == null) { token = req.url.queryParameters["token"]; } bool check = true; String checkMsg = "token为空"; if (tokenCheck != null) { //自定义token校验函数存在 var checkResult = await tokenCheck!(token); if (!checkResult.status) { //校验失败 response = Response.ok(checkResult.errMsg); return setResponseHeader(response, p[1].type); } payload = checkResult.payload; check = true; } else { if (token != null) { try { payload = JWT.decode(token).payload; check = true; } catch (e) { //jwt解码失败 print("token: $token msg: $e"); checkMsg = "身份认证失败"; check = false; } } else { check = false; } } if (!check) { response = Response.ok(jsonEncode({"code": -10000, "msg": checkMsg})); return setResponseHeader(response, p[1].type); } } //自定义拦截函数 if (interceptor != null) { var response = interceptor!(req, payload ?? {}); if (response != null) { return response; } } if (p[3] == 2) { response = await p[2](req, payload ?? {}); } else { response = await p[2](req); } if (response is! Response) { //返回数据不是 Response 对象 response = Response.ok(response.toString()); } response = setResponseHeader(response, p[1].type); return response; }); } }); } catch (e) { if (e is NoSuchMethodError) { print( "请在 ${handler.runtimeType} 中实现get方法 Map> get routeMap => routes;"); return; } else { rethrow; } } } void addRouter(HttpMethod method, String path, dynamic handler) { if (method == HttpMethod.WS) { //添加websocket连接处理函数 if (handler is WebSocketHandler) { _wsCall[path] = handler; } else { print("回调函数类型错误 需要 WebSocketHandler "); } } else { if (method.name == "ALL") { _router.all(path, handler); } else { _router.add(method.name, path, handler); } if (method.name != "OPTIONS") { _router.options(path, (r) { return Response.ok('', headers: { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Content-Type,X-Span-ID,X-Trace-ID,token', 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', }); }); } } } } abstract class WebSocketHandler { Map chanelMap = {}; void open(String id); void close(String id); void message(String id, dynamic message); void sendData(String id, dynamic data) { if (chanelMap.containsKey(id)) { chanelMap[id].sink.add(data); } else { print("id未找到"); } } }