314 lines
10 KiB
Dart
314 lines
10 KiB
Dart
import 'dart:convert';
|
||
import 'dart:io';
|
||
|
||
import 'package:EasyDartModule/EasyDartModule.dart';
|
||
import 'package:EasyDartModule/base/logger/Logger.dart';
|
||
import 'package:EasyDartModule/base/webserver/WebServer.dart';
|
||
import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart';
|
||
import 'package:shelf/shelf.dart';
|
||
import 'package:shelf/shelf_io.dart';
|
||
import 'package:shelf_router/shelf_router.dart';
|
||
import 'package:shelf_web_socket/shelf_web_socket.dart';
|
||
import 'package:uuid/uuid.dart';
|
||
|
||
class ShelfWebServer implements WebServer {
|
||
late Logger logger;
|
||
HttpServer? _server;
|
||
final Router _router = Router();
|
||
final Uuid uuid = Uuid();
|
||
final Map<String, WebSocketHandler> _wsCall = {};
|
||
TokenResult Function(String?)? tokenCheck;
|
||
Response? Function(Request, Map<String, dynamic>)? interceptor;
|
||
|
||
ShelfWebServer(this.logger);
|
||
final String tag = "webserver";
|
||
|
||
// 中间件:记录请求日志
|
||
Middleware logRequests() {
|
||
return (Handler innerHandler) {
|
||
return (Request request) async {
|
||
final requestId = request.context['request_trace_id'] as String;
|
||
final requestSpanId = request.context['request_span_id'] as String;
|
||
final requestParentSpanId =
|
||
request.context['request_parent_span_id'] as String?;
|
||
logger.info('| 请求路径: ${request.method} ${request.requestedUri}',
|
||
tag: tag,
|
||
traceId: requestId,
|
||
spanId: requestSpanId,
|
||
parentSpanId: requestParentSpanId);
|
||
// final stopwatch = Stopwatch()..start();
|
||
final stopwatch = request.context["request_stop_watch"] as Stopwatch;
|
||
stopwatch.start();
|
||
Response response;
|
||
try {
|
||
response = await innerHandler(request);
|
||
stopwatch.stop();
|
||
logger.info(
|
||
'| 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms',
|
||
tag: tag,
|
||
traceId: requestId,
|
||
spanId: requestSpanId,
|
||
parentSpanId: requestParentSpanId);
|
||
} catch (e, s) {
|
||
if (e is HijackException) {
|
||
//不能处理该异常直接抛出
|
||
rethrow;
|
||
}
|
||
stopwatch.stop();
|
||
logger.error("| 服务器错误 | ${e.toString()} ${s.toString()}",
|
||
tag: tag,
|
||
traceId: requestId,
|
||
spanId: requestSpanId,
|
||
parentSpanId: requestParentSpanId);
|
||
response = Response(500,
|
||
body:
|
||
"Internal Server Error\r\nTraceId:$requestId\r\nSpanId:$requestSpanId");
|
||
}
|
||
return response;
|
||
};
|
||
};
|
||
}
|
||
|
||
// 中间件:为每个请求生成唯一的请求ID
|
||
Middleware requestIdMiddleware() {
|
||
return (Handler innerHandler) {
|
||
return (Request request) async {
|
||
// 获取请求头中的 X-Request-ID,如果没有则生成一个
|
||
final requestId = request.headers['X-Trace-ID'] ?? uuid.v4();
|
||
String? spanId = request.headers['X-Span-ID'];
|
||
String? parentSpanId;
|
||
if (spanId != null) {
|
||
parentSpanId = spanId;
|
||
}
|
||
spanId = uuid.v4();
|
||
final updatedRequest = request.change(context: {
|
||
'request_trace_id': requestId,
|
||
"request_span_id": spanId,
|
||
"request_parent_span_id": parentSpanId,
|
||
"request_stop_watch": Stopwatch()
|
||
});
|
||
return innerHandler(updatedRequest);
|
||
};
|
||
};
|
||
}
|
||
|
||
Future<Response> _routerHandler(Request request) async {
|
||
String path = request.url.path;
|
||
if (_wsCall.containsKey(path)) {
|
||
final String spanId = request.context['request_span_id'] as String;
|
||
final String requestId = request.context['request_trace_id'] as String;
|
||
final String? requestParentSpanId =
|
||
request.context['request_parent_span_id'] as String?;
|
||
return await webSocketHandler((channel) {
|
||
_wsCall[path]!.chanelMap[spanId] = channel;
|
||
_wsCall[path]!.open(spanId);
|
||
// 监听 WebSocket 消息
|
||
channel.stream.listen((message) {
|
||
// print('Received message: $message');
|
||
// 处理消息并回应客户端
|
||
// channel.sink.add(message);
|
||
_wsCall[path]!.message(spanId, message);
|
||
}, onDone: () {
|
||
try {
|
||
_wsCall[path]!.close(spanId);
|
||
_wsCall[path]!.chanelMap.remove(spanId);
|
||
} catch (e) {
|
||
print(e);
|
||
}
|
||
|
||
final stopwatch = request.context["request_stop_watch"] as Stopwatch;
|
||
stopwatch.stop();
|
||
logger.info(
|
||
'parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms',
|
||
tag: "webserver",
|
||
traceId: requestId,
|
||
spanId: spanId);
|
||
}, onError: (error) {
|
||
logger.info(
|
||
'parentSpanId=$requestParentSpanId | WebSocket error: $error',
|
||
tag: "webserver",
|
||
traceId: requestId,
|
||
spanId: spanId);
|
||
});
|
||
})(request);
|
||
} else {
|
||
return await _router.call(request);
|
||
}
|
||
}
|
||
|
||
@override
|
||
void start(int port,
|
||
{Response? Function(Request, Map<String, dynamic>)? interceptor,
|
||
TokenResult Function(String?)? tokenCheck}) async {
|
||
var pipeline = Pipeline()
|
||
//生成请求id
|
||
.addMiddleware(requestIdMiddleware())
|
||
//api日志记录
|
||
.addMiddleware(logRequests());
|
||
//添加自定义拦截器
|
||
if (interceptor != null) {
|
||
this.interceptor = interceptor;
|
||
}
|
||
if (tokenCheck != null) {
|
||
this.tokenCheck = tokenCheck;
|
||
}
|
||
final handler = pipeline.addHandler(_routerHandler);
|
||
_server = await serve(handler, InternetAddress.anyIPv4, port);
|
||
print('Server listening on port ${_server?.port}');
|
||
}
|
||
|
||
@override
|
||
void stop() {
|
||
_server?.close();
|
||
}
|
||
|
||
Response setResponseHeader(response, contentType) {
|
||
Map<String, String> headers = {...response.headers};
|
||
headers['Access-Control-Allow-Origin'] = '*';
|
||
headers['Access-Control-Allow-Headers'] =
|
||
'Content-Type,X-Span-ID,X-Trace-ID,token';
|
||
headers["Access-Control-Expose-Headers"] =
|
||
"Content-Type,X-Span-ID,X-Trace-ID,token";
|
||
headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS';
|
||
headers["content-type"] = contentType;
|
||
|
||
return response.change(headers: headers);
|
||
}
|
||
|
||
@override
|
||
void addHandler(handler) {
|
||
try {
|
||
handler.callHandler = handler;
|
||
} catch (e) {
|
||
print(
|
||
"请在 ${handler.runtimeType} 中实现set方法 set callHandler(handler) => _callHandler = handler;");
|
||
return;
|
||
}
|
||
try {
|
||
handler.routeMap.forEach((k, v) {
|
||
for (var p in v) {
|
||
if (k == HttpMethod.WS) {
|
||
addRouter(k, p[0], handler);
|
||
continue;
|
||
}
|
||
addRouter(k, p[0], (req, [a, b, c, d, e, f]) async {
|
||
//从request中获取token并解码传入
|
||
bool auth = true;
|
||
if (p.length >= 5) {
|
||
//判断是否需要进行token校验
|
||
auth = p[4];
|
||
}
|
||
dynamic payload;
|
||
var response;
|
||
|
||
if (auth) {
|
||
String? token = req.headers["token"];
|
||
bool check = true;
|
||
if (tokenCheck != null) {
|
||
//自定义token校验函数存在
|
||
var checkResult = tokenCheck!(token);
|
||
if (!checkResult.status) {
|
||
//校验失败
|
||
response = Response.ok(checkResult.errMsg);
|
||
return setResponseHeader(response, p[1].type);
|
||
}
|
||
payload = checkResult.payload;
|
||
check = true;
|
||
} else {
|
||
if (token != null) {
|
||
try {
|
||
payload = JWT.decode(token).payload;
|
||
check = true;
|
||
} catch (e) {
|
||
//jwt解码失败
|
||
print(e);
|
||
check = false;
|
||
}
|
||
} else {
|
||
check = false;
|
||
}
|
||
}
|
||
if (!check) {
|
||
response =
|
||
Response.ok(jsonEncode({"code": -10000, "msg": "token为空"}));
|
||
return setResponseHeader(response, p[1].type);
|
||
}
|
||
}
|
||
//自定义拦截函数
|
||
if (interceptor != null) {
|
||
var response = interceptor!(req, payload ?? {});
|
||
if (response != null) {
|
||
return response;
|
||
}
|
||
}
|
||
|
||
if (p[3] == 2) {
|
||
response = await p[2](req, payload ?? <String, dynamic>{});
|
||
} else {
|
||
response = await p[2](req);
|
||
}
|
||
if (response is! Response) {
|
||
//返回数据不是 Response 对象
|
||
response = Response.ok(response.toString());
|
||
}
|
||
response = setResponseHeader(response, p[1].type);
|
||
|
||
return response;
|
||
});
|
||
}
|
||
});
|
||
} catch (e) {
|
||
if (e is NoSuchMethodError) {
|
||
print(
|
||
"请在 ${handler.runtimeType} 中实现get方法 Map<HttpMethod, List<List>> get routeMap => routes;");
|
||
return;
|
||
} else {
|
||
rethrow;
|
||
}
|
||
}
|
||
}
|
||
|
||
void addRouter(HttpMethod method, String path, dynamic handler) {
|
||
if (method == HttpMethod.WS) {
|
||
//添加websocket连接处理函数
|
||
if (handler is WebSocketHandler) {
|
||
_wsCall[path] = handler;
|
||
} else {
|
||
print("回调函数类型错误 需要 WebSocketHandler ");
|
||
}
|
||
} else {
|
||
if (method.name == "ALL") {
|
||
_router.all(path, handler);
|
||
} else {
|
||
_router.add(method.name, path, handler);
|
||
}
|
||
|
||
if (method.name != "OPTIONS") {
|
||
_router.options(path, (r) {
|
||
return Response.ok('', headers: {
|
||
'Access-Control-Allow-Origin': '*',
|
||
'Access-Control-Allow-Headers':
|
||
'Content-Type,X-Span-ID,X-Trace-ID,token',
|
||
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
|
||
});
|
||
});
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
abstract class WebSocketHandler {
|
||
Map<String, dynamic> chanelMap = {};
|
||
void open(String id);
|
||
void close(String id);
|
||
void message(String id, dynamic message);
|
||
|
||
void sendData(String id, dynamic data) {
|
||
if (chanelMap.containsKey(id)) {
|
||
chanelMap[id].sink.add(data);
|
||
} else {
|
||
print("id未找到");
|
||
}
|
||
}
|
||
}
|