Files
easy_dart_module/lib/base/webserver/impl/ShelfWebServer.dart

367 lines
12 KiB
Dart
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import 'dart:convert';
import 'dart:io';
import 'package:EasyDartModule/EasyDartModule.dart';
import 'package:EasyDartModule/base/logger/Logger.dart';
import 'package:EasyDartModule/base/webserver/WebServer.dart';
import 'package:dart_jsonwebtoken/dart_jsonwebtoken.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart';
import 'package:shelf_router/shelf_router.dart';
import 'package:shelf_static/shelf_static.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:uuid/uuid.dart';
class ShelfWebServer implements WebServer {
late Logger logger;
HttpServer? _server;
final Router _router = Router();
final Uuid uuid = Uuid();
final Map<String, WebSocketHandler> _wsCall = {};
Future<TokenResult> Function(String?)? tokenCheck;
Response? Function(Request, Map<String, dynamic>)? interceptor;
ShelfWebServer(this.logger);
final String tag = "webserver";
Stream<List<int>> encodeStream(
String data, {
Encoding encoding = utf8,
}) async* {
yield encoding.encode(data);
}
Future<String> decodeStream(Stream<List<int>> stream,
{Encoding encoding = utf8}) {
return encoding.decodeStream(stream);
}
// 中间件:记录请求日志
Middleware logRequests() {
return (Handler innerHandler) {
return (Request request) async {
String body = "";
if (request.mimeType == null ||
request.mimeType != "multipart/form-data") {
body = await request.readAsString();
request = request.change(body: body);
}
final requestId = request.context['request_trace_id'] as String;
final requestSpanId = request.context['request_span_id'] as String;
final requestParentSpanId =
request.context['request_parent_span_id'] as String?;
logger.info('| 请求路径: ${request.method} ${request.requestedUri} $body',
tag: tag,
traceId: requestId,
spanId: requestSpanId,
parentSpanId: requestParentSpanId);
// final stopwatch = Stopwatch()..start();
final stopwatch = request.context["request_stop_watch"] as Stopwatch;
stopwatch.start();
Response response;
try {
response = await innerHandler(request);
stopwatch.stop();
logger.info(
'| 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms',
tag: tag,
traceId: requestId,
spanId: requestSpanId,
parentSpanId: requestParentSpanId);
} catch (e, s) {
if (e is HijackException) {
//不能处理该异常直接抛出
rethrow;
}
stopwatch.stop();
logger.error("| 服务器错误 | ${e.toString()} ${s.toString()}",
tag: tag,
traceId: requestId,
spanId: requestSpanId,
parentSpanId: requestParentSpanId);
response = Response(500,
body:
"Internal Server Error\r\nTraceId:$requestId\r\nSpanId:$requestSpanId");
}
return response;
};
};
}
// 中间件为每个请求生成唯一的请求ID
Middleware requestIdMiddleware() {
return (Handler innerHandler) {
return (Request request) async {
// 获取请求头中的 X-Request-ID如果没有则生成一个
final requestId = request.headers['X-Trace-ID'] ?? uuid.v4();
String? spanId = request.headers['X-Span-ID'];
String? parentSpanId;
if (spanId != null) {
parentSpanId = spanId;
}
spanId = uuid.v4();
final updatedRequest = request.change(context: {
'request_trace_id': requestId,
"request_span_id": spanId,
"request_parent_span_id": parentSpanId,
"request_stop_watch": Stopwatch()
});
return innerHandler(updatedRequest);
};
};
}
Future<Response> _routerHandler(Request request) async {
String path = request.url.path;
if (_wsCall.containsKey(path)) {
final String spanId = request.context['request_span_id'] as String;
final String requestId = request.context['request_trace_id'] as String;
final String? requestParentSpanId =
request.context['request_parent_span_id'] as String?;
return await webSocketHandler((channel) {
_wsCall[path]!.chanelMap[spanId] = channel;
_wsCall[path]!.open(spanId);
// 监听 WebSocket 消息
channel.stream.listen((message) {
// print('Received message: $message');
// 处理消息并回应客户端
// channel.sink.add(message);
_wsCall[path]!.message(spanId, message);
}, onDone: () {
try {
_wsCall[path]!.close(spanId);
_wsCall[path]!.chanelMap.remove(spanId);
} catch (e) {
print(e);
}
final stopwatch = request.context["request_stop_watch"] as Stopwatch;
stopwatch.stop();
logger.info(
'parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms',
tag: "webserver",
traceId: requestId,
spanId: spanId);
}, onError: (error) {
logger.info(
'parentSpanId=$requestParentSpanId | WebSocket error: $error',
tag: "webserver",
traceId: requestId,
spanId: spanId);
});
})(request);
} else {
return await _router.call(request);
}
}
@override
void start(int port,
{Response? Function(Request, Map<String, dynamic>)? interceptor,
Future<TokenResult> Function(String?)? tokenCheck,
StaticInfo? static}) async {
var pipeline = Pipeline()
//生成请求id
.addMiddleware(requestIdMiddleware())
//api日志记录
.addMiddleware(logRequests())
.addMiddleware(createMiddleware(responseHandler: (response) {
var headers = {...response.headers, "X-Frame-Options": "ALLOWALL"};
// headers.remove("X-Frame-Options");
return response.change(headers: headers);
}));
//添加自定义拦截器
if (interceptor != null) {
this.interceptor = interceptor;
}
if (tokenCheck != null) {
this.tokenCheck = tokenCheck;
}
var handler;
if (static != null) {
var cascadeHandler = Cascade()
.add(_routerHandler)
.add(createStaticHandler(static.path, defaultDocument: static.root))
.handler;
handler = pipeline.addHandler(cascadeHandler);
print("HTTP Server Static File Path: ${static.path}");
}else{
handler = pipeline.addHandler(_routerHandler);
}
_server = await serve(handler, InternetAddress.anyIPv4, port);
print('HTTP Server Listening On Port ${_server?.port}');
}
@override
void stop() {
_server?.close();
}
Response setResponseHeader(response, contentType) {
Map<String, String> headers = {...response.headers};
headers['Access-Control-Allow-Origin'] = '*';
headers['Access-Control-Allow-Headers'] =
'Content-Type,X-Span-ID,X-Trace-ID,token';
headers["Access-Control-Expose-Headers"] =
"Content-Type,X-Span-ID,X-Trace-ID,token";
headers['Access-Control-Allow-Methods'] = 'GET, POST, PUT, DELETE, OPTIONS';
var uct = headers["user-content-type"];
if (uct != null) {
headers.remove("user-content-type");
headers["content-type"] = uct;
} else {
headers["content-type"] = contentType;
}
return response.change(headers: headers);
}
@override
void addHandler(handler) {
try {
handler.callHandler = handler;
} catch (e) {
print(
"请在 ${handler.runtimeType} 中实现set方法 set callHandler(handler) => _callHandler = handler;");
return;
}
try {
handler.routeMap.forEach((k, v) {
for (var p in v) {
if (k == HttpMethod.WS) {
addRouter(k, p[0], handler);
continue;
}
addRouter(k, p[0], (Request req, [a, b, c, d, e, f]) async {
//从request中获取token并解码传入
bool auth = true;
if (p.length >= 5) {
//判断是否需要进行token校验
auth = p[4];
}
dynamic payload;
var response;
if (auth) {
String? token = req.headers["token"];
if (token == null) {
token = req.url.queryParameters["token"];
}
bool check = true;
String checkMsg = "token为空";
if (tokenCheck != null) {
//自定义token校验函数存在
var checkResult = await tokenCheck!(token);
if (!checkResult.status) {
//校验失败
response = Response.ok(checkResult.errMsg);
return setResponseHeader(response, p[1].type);
}
payload = checkResult.payload;
check = true;
} else {
if (token != null) {
try {
payload = JWT.decode(token).payload;
check = true;
} catch (e) {
//jwt解码失败
print("token: $token msg: $e");
checkMsg = "身份认证失败";
check = false;
}
} else {
check = false;
}
}
if (!check) {
response =
Response.ok(jsonEncode({"code": -10000, "msg": checkMsg}));
return setResponseHeader(response, p[1].type);
}
}
//自定义拦截函数
if (interceptor != null) {
var response = interceptor!(req, payload ?? {});
if (response != null) {
return response;
}
}
if (p[3] == 2) {
response = await p[2](req, payload ?? <String, dynamic>{});
} else {
response = await p[2](req);
}
if (response is! Response) {
//返回数据不是 Response 对象
response = Response.ok(response.toString());
}
response = setResponseHeader(response, p[1].type);
return response;
});
}
});
} catch (e) {
if (e is NoSuchMethodError) {
print(
"请在 ${handler.runtimeType} 中实现get方法 Map<HttpMethod, List<List>> get routeMap => routes;");
return;
} else {
rethrow;
}
}
}
void addRouter(HttpMethod method, String path, dynamic handler) {
if (method == HttpMethod.WS) {
//添加websocket连接处理函数
if (handler is WebSocketHandler) {
_wsCall[path] = handler;
} else {
print("回调函数类型错误 需要 WebSocketHandler ");
}
} else {
if (method.name == "ALL") {
_router.all(path, handler);
} else {
_router.add(method.name, path, handler);
}
if (method.name != "OPTIONS") {
_router.options(path, (r) {
return Response.ok('', headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers':
'Content-Type,X-Span-ID,X-Trace-ID,token',
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
});
});
}
}
}
}
abstract class WebSocketHandler {
Map<String, dynamic> chanelMap = {};
void open(String id);
void close(String id);
void message(String id, dynamic message);
void forceClose(String id, {String? msg}) {
if (chanelMap.containsKey(id)) {
chanelMap[id].sink.close(4000, msg);
}
}
void sendData(String id, dynamic data) {
if (chanelMap.containsKey(id)) {
chanelMap[id].sink.add(data);
} else {
print("id未找到");
}
}
}