Files
easy_dart_module/lib/base/webserver/impl/ShelfWebServer.dart

267 lines
8.7 KiB
Dart
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import 'dart:io';
import 'package:EasyDartModule/EasyDartModule.dart';
import 'package:EasyDartModule/base/logger/Logger.dart';
import 'package:EasyDartModule/base/webserver/WebServer.dart';
import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart';
import 'package:shelf_router/shelf_router.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:uuid/uuid.dart';
class ShelfWebServer implements WebServer {
late Logger logger;
HttpServer? _server;
final Router _router = Router();
final Uuid uuid = Uuid();
final Map<String, WebSocketHandler> _wsCall = {};
ShelfWebServer(this.logger);
final String tag = "webserver";
// 中间件:记录请求日志
Middleware logRequests() {
return (Handler innerHandler) {
return (Request request) async {
final requestId = request.context['request_trace_id'] as String;
final requestSpanId = request.context['request_span_id'] as String;
final requestParentSpanId =
request.context['request_parent_span_id'] as String?;
logger.info('| 请求路径: ${request.method} ${request.requestedUri}',
tag: tag,
traceId: requestId,
spanId: requestSpanId,
parentSpanId: requestParentSpanId);
// final stopwatch = Stopwatch()..start();
final stopwatch = request.context["request_stop_watch"] as Stopwatch;
stopwatch.start();
Response response;
try {
response = await innerHandler(request);
stopwatch.stop();
logger.info(
'| 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms',
tag: tag,
traceId: requestId,
spanId: requestSpanId,
parentSpanId: requestParentSpanId);
} catch (e, s) {
if (e is HijackException) {
//不能处理该异常直接抛出
rethrow;
}
stopwatch.stop();
logger.error("| 服务器错误 | ${e.toString()} ${s.toString()}",
tag: tag,
traceId: requestId,
spanId: requestSpanId,
parentSpanId: requestParentSpanId);
response = Response(500,
body:
"Internal Server Error\r\nTraceId:$requestId\r\nSpanId:$requestSpanId");
}
return response;
};
};
}
// 中间件为每个请求生成唯一的请求ID
Middleware requestIdMiddleware() {
return (Handler innerHandler) {
return (Request request) async {
// 获取请求头中的 X-Request-ID如果没有则生成一个
final requestId = request.headers['X-Trace-ID'] ?? uuid.v4();
String? spanId = request.headers['X-Span-ID'];
String? parentSpanId;
if (spanId != null) {
parentSpanId = spanId;
}
spanId = uuid.v4();
final updatedRequest = request.change(context: {
'request_trace_id': requestId,
"request_span_id": spanId,
"request_parent_span_id": parentSpanId,
"request_stop_watch": Stopwatch()
});
return innerHandler(updatedRequest);
};
};
}
Future<Response> _routerHandler(Request request) async {
String path = request.url.path;
if (_wsCall.containsKey(path)) {
final String spanId = request.context['request_span_id'] as String;
final String requestId = request.context['request_trace_id'] as String;
final String? requestParentSpanId =
request.context['request_parent_span_id'] as String?;
return await webSocketHandler((channel) {
_wsCall[path]!.chanelMap[spanId] = channel;
_wsCall[path]!.open(spanId);
// 监听 WebSocket 消息
channel.stream.listen((message) {
// print('Received message: $message');
// 处理消息并回应客户端
// channel.sink.add(message);
_wsCall[path]!.message(spanId, message);
}, onDone: () {
try {
_wsCall[path]!.close(spanId);
_wsCall[path]!.chanelMap.remove(spanId);
} catch (e) {
print(e);
}
final stopwatch = request.context["request_stop_watch"] as Stopwatch;
stopwatch.stop();
logger.info(
'parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms',
tag: "webserver",
traceId: requestId,
spanId: spanId);
}, onError: (error) {
logger.info(
'parentSpanId=$requestParentSpanId | WebSocket error: $error',
tag: "webserver",
traceId: requestId,
spanId: spanId);
});
})(request);
} else {
return await _router.call(request);
}
}
@override
void start(int port, {Function? interceptor}) async {
//反射获取全部路由地址
var pipeline = Pipeline()
.addMiddleware(requestIdMiddleware())
.addMiddleware(logRequests());
//判断拦截器类型
if (interceptor != null) {
pipeline = pipeline.addMiddleware((Handler innerHandler) {
return (Request request) async {
var response = interceptor(request);
if (response == null) {
return innerHandler(request);
}
return response;
};
});
}
final handler = pipeline.addHandler(_routerHandler);
_server = await serve(handler, InternetAddress.anyIPv4, port);
print('Server listening on port ${_server?.port}');
}
@override
void stop() {
_server?.close();
}
@override
void addHandler(handler) {
try {
handler.callHandler = handler;
} catch (e) {
print(
"请在 ${handler.runtimeType} 中实现set方法 set callHandler(handler) => _callHandler = handler;");
return;
}
try {
handler.routeMap.forEach((k, v) {
for (var p in v) {
if (k == HttpMethod.WS) {
addRouter(k, p[0], handler);
continue;
}
addRouter(k, p[0], (req, [a, b, c, d, e, f]) async {
//从request中获取token并解码传入
String? token = req.headers["token"];
dynamic payload;
if (token != null) {
try {
payload = JWT.decode(token).payload;
} catch (e) {
//jwt解码失败
print(e);
}
}
var response;
if (p[3] == 2) {
response = await p[2](req, payload ?? <String, dynamic>{});
} else {
response = await p[2](req);
}
Map<String, String> headers = {...response.headers};
headers['Access-Control-Allow-Origin'] = '*';
headers['Access-Control-Allow-Headers'] =
'Content-Type,X-Span-ID,X-Trace-ID,token';
headers["Access-Control-Expose-Headers"] =
"Content-Type,X-Span-ID,X-Trace-ID,token";
headers['Access-Control-Allow-Methods'] =
'GET, POST, PUT, DELETE, OPTIONS';
headers["content-type"] = p[1].type;
response = response.change(headers: headers);
return response;
});
}
});
} catch (e) {
if (e is NoSuchMethodError) {
print(
"请在 ${handler.runtimeType} 中实现get方法 Map<HttpMethod, List<List>> get routeMap => routes;");
return;
} else {
rethrow;
}
}
}
void addRouter(HttpMethod method, String path, dynamic handler) {
if (method == HttpMethod.WS) {
//添加websocket连接处理函数
if (handler is WebSocketHandler) {
_wsCall[path] = handler;
} else {
print("回调函数类型错误 需要 WebSocketHandler ");
}
} else {
if (method.name == "ALL") {
_router.all(path, handler);
} else {
_router.add(method.name, path, handler);
}
if (method.name != "OPTIONS") {
_router.options(path, (r) {
return Response.ok('', headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Content-Type,X-Span-ID,X-Trace-ID,token',
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
});
});
}
}
}
}
abstract class WebSocketHandler {
Map<String, dynamic> chanelMap = {};
void open(String id);
void close(String id);
void message(String id, dynamic message);
void sendData(String id, dynamic data) {
if (chanelMap.containsKey(id)) {
chanelMap[id].sink.add(data);
} else {
print("id未找到");
}
}
}