From 34d37e579b008f106d23ee0ff1fc0b064254b0c4 Mon Sep 17 00:00:00 2001 From: qmqz Date: Mon, 31 Mar 2025 15:08:04 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0http=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E5=99=A8=20=E5=89=94=E9=99=A4=E5=8F=8D=E5=B0=84=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.yaml | 14 ++ lib/EasyDartModule.dart | 4 + lib/base/http/TraceDio.dart | 20 +-- lib/base/mqtt/mqtt.dart | 36 +++- lib/base/webserver/WebServer.dart | 19 ++- lib/base/webserver/impl/RouteGenerator.dart | 85 ++++++++++ lib/base/webserver/impl/ShelfWebServer.dart | 158 ++++++++---------- .../webserver/impl/ShelfWebServer_none.dart | 2 +- pubspec.yaml | 4 + 9 files changed, 233 insertions(+), 109 deletions(-) create mode 100644 build.yaml create mode 100644 lib/base/webserver/impl/RouteGenerator.dart diff --git a/build.yaml b/build.yaml new file mode 100644 index 0000000..d50d82f --- /dev/null +++ b/build.yaml @@ -0,0 +1,14 @@ +targets: + $default: + builders: + EasyDartModule|route_generator: + enabled: true + +builders: + route_generator: + target: ":route_generator" + import: "package:EasyDartModule/base/webserver/impl/RouteGenerator.dart" + builder_factories: ["routeGenerator"] + build_extensions: { ".dart": [".route.dart"] } + auto_apply: root_package + build_to: source diff --git a/lib/EasyDartModule.dart b/lib/EasyDartModule.dart index c402ad0..1de3c8d 100644 --- a/lib/EasyDartModule.dart +++ b/lib/EasyDartModule.dart @@ -15,6 +15,7 @@ import 'package:EasyDartModule/base/webserver/WebServer.dart'; import 'package:EasyDartModule/base/webserver/impl/ShelfWebServer.dart' if (dart.library.html) 'package:EasyDartModule/base/webserver/impl/ShelfWebServer_none.dart'; import 'package:EasyDartModule/base/websocket/WebSocket.dart'; +import 'package:event_bus/event_bus.dart'; export 'package:shelf/shelf.dart'; export 'package:mongo_dart/mongo_dart.dart'; @@ -29,6 +30,9 @@ class EasyDartModule { static Storage get storage => Storage.getInstance(); static Redis get redis => Redis.getInstance(); static WebSocket get websocket => WebSocket.getInstance(); + // 全局事件总线 + static final EventBus _eventBus = EventBus(); + static EventBus get eventBus => _eventBus; static bool init({ DiscoveryConfig? discoveryConfig, diff --git a/lib/base/http/TraceDio.dart b/lib/base/http/TraceDio.dart index c95e102..56b68af 100644 --- a/lib/base/http/TraceDio.dart +++ b/lib/base/http/TraceDio.dart @@ -62,7 +62,7 @@ class TraceDio { "请求错误: url:${e.requestOptions.path} 请求方法:${e.requestOptions.method} 请求数据:${e.requestOptions.data} 响应代码:${e.response?.statusCode} 响应消息:${e.response?.statusMessage} 响应内容:${e.response?.data}"; } else { errMsg = - "请求错误: url:${e.requestOptions.path} 请求方法:${e.requestOptions.method} 请求数据:${e.requestOptions.data} 错误消息:${e.error}"; + "请求错误: url:${e.requestOptions.path} 请求方法:${e.requestOptions.method} 请求数据:${e.requestOptions.data} 错误消息:${e.message ?? e.error}"; } print(errMsg); @@ -103,18 +103,12 @@ class TraceDio { sf.Request? request, ResponseType? responseType, Duration? receiveTimeout}) async { - try { - return await _dio.get(url, - queryParameters: queryParameters, - options: Options( - receiveTimeout: receiveTimeout, - headers: getHeader(request: request), - responseType: responseType)); - } catch (e) { - //api接口报错 - var a = e as DioException; - return a.response!; - } + return await _dio.get(url, + queryParameters: queryParameters, + options: Options( + receiveTimeout: receiveTimeout, + headers: getHeader(request: request), + responseType: responseType)); } // 发起 POST 请求 diff --git a/lib/base/mqtt/mqtt.dart b/lib/base/mqtt/mqtt.dart index e225ed9..0bed22a 100644 --- a/lib/base/mqtt/mqtt.dart +++ b/lib/base/mqtt/mqtt.dart @@ -1,4 +1,5 @@ import 'dart:convert'; +import 'dart:typed_data'; import 'package:mqtt5_client/mqtt5_client.dart'; import 'package:mqtt5_client/mqtt5_server_client.dart'; @@ -29,7 +30,14 @@ class Mqtt { _config.port); _client?.autoReconnect = true; - var state = await _client?.connect(_config.username, _config.password); + MqttConnectionStatus? state; + try { + state = await _client?.connect(_config.username, _config.password); + } catch (e) { + //连接错误 + print("mqtt连接失败: 地址:${_config.host} 原因:$e"); + return false; + } if (state!.state != MqttConnectionState.connected) { //连接失败 print("mqtt连接失败: 地址:${_config.host} 原因:$state"); @@ -39,10 +47,20 @@ class Mqtt { _client?.updates.listen((List> message) { final recMess = message[0].payload as MqttPublishMessage; - final payload = Utf8Decoder().convert(recMess.payload.message!); - // final payload = - // MqttUtilities.bytesToStringAsString(recMess.payload.message!); - _config.messgae(message[0].topic!, payload); + try { + if (_config.messgae != null) { + final payload = Utf8Decoder().convert(recMess.payload.message!); + // final payload = + // MqttUtilities.bytesToStringAsString(recMess.payload.message!); + _config.messgae!(message[0].topic!, payload); + } else if (_config.buffMessage != null) { + _config.buffMessage!( + message[0].topic!, recMess.payload.message!.buffer.asUint8List()); + } + } catch (e) { + //转字符串失败 + print(e); + } }); _config.topic?.forEach((topic) { subscribe(topic, _config.qos); @@ -80,13 +98,17 @@ class MqttConfig { String? password; List? topic; int qos; - Function(String topic, String message) messgae; + //文本消息 + Function(String topic, String message)? messgae; + //二进制消息 + Function(String topic, Uint8List buff)? buffMessage; MqttConfig( {required this.host, this.port = 1883, required this.clientId, - required this.messgae, + this.messgae, + this.buffMessage, this.topic, this.qos = 0, this.username, diff --git a/lib/base/webserver/WebServer.dart b/lib/base/webserver/WebServer.dart index 035af51..61507e2 100644 --- a/lib/base/webserver/WebServer.dart +++ b/lib/base/webserver/WebServer.dart @@ -1,4 +1,3 @@ -import 'dart:io'; abstract class WebServer { static late WebServer _webServer; @@ -11,7 +10,7 @@ abstract class WebServer { _webServer = server; } - void start(int port); + void start(int port, {Function? interceptor}); void stop(); void addHandler(handler); } @@ -24,6 +23,14 @@ enum HttpMethod { ALL, WS, ; + + static HttpMethod valueOf(String type) { + var tmp = HttpMethod.values.where((t) => t.name == type).toList(); + if (tmp.isEmpty) { + return HttpMethod.ALL; + } + return tmp[0]; + } } enum HttpResponseType { @@ -37,6 +44,14 @@ enum HttpResponseType { final String type; const HttpResponseType(this.type); + + static HttpResponseType valueOf(String type) { + var tmp = HttpResponseType.values.where((t) => t.type == type).toList(); + if (tmp.isEmpty) { + return HttpResponseType.JSON; + } + return tmp[0]; + } } class RequestMapping { diff --git a/lib/base/webserver/impl/RouteGenerator.dart b/lib/base/webserver/impl/RouteGenerator.dart new file mode 100644 index 0000000..aa434f6 --- /dev/null +++ b/lib/base/webserver/impl/RouteGenerator.dart @@ -0,0 +1,85 @@ +import 'package:EasyDartModule/base/webserver/WebServer.dart'; +import 'package:build/build.dart'; +import 'package:analyzer/dart/element/element.dart'; +import 'package:source_gen/source_gen.dart'; +import 'package:path/path.dart'; + +Builder routeGenerator(BuilderOptions options) { + return LibraryBuilder( + RouteGenerator(), + generatedExtension: '.route.dart', + ); +} + +class RouteGenerator extends GeneratorForAnnotation { + call(String name) {} + @override + generateForAnnotatedElement( + Element element, ConstantReader annotation, BuildStep buildStep) { + // 读取注解字段 + String path = annotation.read('path').stringValue; + String method = annotation.read('method').read("_name").stringValue; + // String response = annotation.read("responseType").read("_name").stringValue; + // print("url:$path $method"); + //0:路径 1:响应类型 2:调用函数 3:参数数量 + Map routes = {}; + if (method != HttpMethod.WS.name) { + if (path.endsWith("/")) { + path = path.substring(0, path.length - 1); + } + for (final method in element.children) { + for (final metaData in method.metadata) { + var anno = ConstantReader(metaData.computeConstantValue()); + if (!anno.instanceOf(typeChecker)) { + continue; + } + + int pl = (method as MethodElement).parameters.length; + // print( + // "url:$path-${anno.read("path").stringValue} method:${anno.read("method").read("_name").stringValue} response:${anno.read("responseType").read("_name").stringValue}"); + //转换为枚举类型 + + var p = anno.read("path").stringValue; + if (!p.startsWith("/")) { + p = "$path/$p"; + } else { + p = path + p; + } + //加入调用缓存 + routes[HttpMethod.valueOf( + anno.read("method").read("_name").stringValue)] = [ + p, + HttpResponseType.valueOf( + anno.read("responseType").read("_name").stringValue), + () { + return method.name; + }, + pl >= 2 ? 2 : 1 + ]; + } + // print("构建完毕"); + } + } else { + //单独处理ws + routes[HttpMethod.valueOf(method)] = [path]; + } + // 生成路由映射代码 + // var data = jsonEncode(routes); + final routesMapString = routes.entries.map((entry) { + return '${entry.key}: [[${entry.value.map((e) { + if (e is String) { + return '"$e"'; + } else if (e is Function) { + return "_callHandler.${e()}"; + } + return e; + }).join(",")}]],'; + }).join('\n'); + // print(routes); + return ''' + part of '${basename(buildStep.inputId.path)}'; + late var _callHandler; + final Map> routes = {$routesMapString}; + '''; + } +} diff --git a/lib/base/webserver/impl/ShelfWebServer.dart b/lib/base/webserver/impl/ShelfWebServer.dart index 0d4bc76..b385be3 100644 --- a/lib/base/webserver/impl/ShelfWebServer.dart +++ b/lib/base/webserver/impl/ShelfWebServer.dart @@ -1,5 +1,4 @@ import 'dart:io'; -import 'dart:mirrors'; import 'package:EasyDartModule/EasyDartModule.dart'; import 'package:EasyDartModule/base/logger/Logger.dart'; @@ -50,7 +49,7 @@ class ShelfWebServer implements WebServer { } catch (e, s) { if (e is HijackException) { //不能处理该异常直接抛出 - throw e; + rethrow; } stopwatch.stop(); logger.error("| 服务器错误 | ${e.toString()} ${s.toString()}", @@ -135,12 +134,24 @@ class ShelfWebServer implements WebServer { } @override - void start(int port) async { + void start(int port, {Function? interceptor}) async { //反射获取全部路由地址 - final handler = Pipeline() + var pipeline = Pipeline() .addMiddleware(requestIdMiddleware()) - .addMiddleware(logRequests()) - .addHandler(_routerHandler); + .addMiddleware(logRequests()); + //判断拦截器类型 + if (interceptor != null) { + pipeline = pipeline.addMiddleware((Handler innerHandler) { + return (Request request) async { + var response = interceptor(request); + if (response == null) { + return innerHandler(request); + } + return response; + }; + }); + } + final handler = pipeline.addHandler(_routerHandler); _server = await serve(handler, InternetAddress.anyIPv4, port); print('Server listening on port ${_server?.port}'); } @@ -152,88 +163,63 @@ class ShelfWebServer implements WebServer { @override void addHandler(handler) { - ClassMirror cm = reflectClass(handler.runtimeType); - var im = reflect(handler); - String path = ""; - for (var metadata in cm.metadata) { - // 检查元数据是否为RequestMapping类型 - if (metadata.reflectee is RequestMapping) { - // 获取实例 - RequestMapping annotation = metadata.reflectee; - path = annotation.path; - if (path.endsWith("/")) { - path = path.substring(0, path.length - 1); + try { + handler.callHandler = handler; + } catch (e) { + print( + "请在 ${handler.runtimeType} 中实现set方法 set callHandler(handler) => _callHandler = handler;"); + return; + } + try { + handler.routeMap.forEach((k, v) { + for (var p in v) { + if (k == HttpMethod.WS) { + addRouter(k, p[0], handler); + continue; + } + addRouter(k, p[0], (req, [a, b, c, d, e, f]) async { + //从request中获取token并解码传入 + String? token = req.headers["token"]; + dynamic payload; + if (token != null) { + try { + payload = JWT.decode(token).payload; + } catch (e) { + //jwt解码失败 + print(e); + } + } + var response; + if (p[3] == 2) { + response = await p[2](req, payload ?? {}); + } else { + response = await p[2](req); + } + Map headers = {...response.headers}; + headers['Access-Control-Allow-Origin'] = '*'; + headers['Access-Control-Allow-Headers'] = + 'Content-Type,X-Span-ID,X-Trace-ID,token'; + headers["Access-Control-Expose-Headers"] = + "Content-Type,X-Span-ID,X-Trace-ID,token"; + headers['Access-Control-Allow-Methods'] = + 'GET, POST, PUT, DELETE, OPTIONS'; + headers["content-type"] = p[1].type; + + response = response.change(headers: headers); + + return response; + }); } - if (annotation.method == HttpMethod.WS) { - //wwebsocket处理句柄直接加入路由 - addRouter(annotation.method, path, handler); - return; - } - break; + }); + } catch (e) { + if (e is NoSuchMethodError) { + print( + "请在 ${handler.runtimeType} 中实现get方法 Map> get routeMap => routes;"); + return; + } else { + rethrow; } } - - im.type.declarations.forEach((k, v) { - if (v is MethodMirror) { - for (var m in v.metadata) { - if (m.reflectee is RequestMapping) { - var mp = m.reflectee.path as String; - - if (mp.startsWith("/")) { - mp = mp.substring(1, mp.length); - } - - String p = "$path/$mp"; - // print("method: ${m.reflectee.method} $p"); - //把地址加入路由 - - addRouter(m.reflectee.method, p, (req, [a, b, c, d, e, f]) async { - //从request中获取token并解码传入 - String? token = req.headers["token"]; - dynamic payload; - if (token != null) { - try { - payload = JWT.decode(token).payload; - } catch (e) { - //jwt解码失败 - print(e); - } - } - List args; - if (v.parameters.length == 2) { - args = [req, payload ?? {}]; - } else { - args = [req]; - } - Response response = - await im.invoke(v.simpleName, args).reflectee as Response; - //判断contenttype是否定义 - // var t = response.headers["content-type"]; - // print(t); - Map headers = {...response.headers}; - // if (t == null) { - //设置默认contenttype - // response = response.change(headers: { - // ...response.headers, - // "content-type": m.reflectee.responseType.type - // }); - // } else { - headers['Access-Control-Allow-Origin'] = '*'; - headers['Access-Control-Allow-Headers'] = - 'Content-Type,X-Span-ID,X-Trace-ID'; - headers['Access-Control-Allow-Methods'] = - 'GET, POST, PUT, DELETE, OPTIONS'; - headers["content-type"] = m.reflectee.responseType.type; - - // } - response = response.change(headers: headers); - - return response; - }); - } - } - } - }); } void addRouter(HttpMethod method, String path, dynamic handler) { @@ -255,7 +241,7 @@ class ShelfWebServer implements WebServer { _router.options(path, (r) { return Response.ok('', headers: { 'Access-Control-Allow-Origin': '*', - 'Access-Control-Allow-Headers': 'Content-Type,X-Span-ID,X-Trace-ID', + 'Access-Control-Allow-Headers': 'Content-Type,X-Span-ID,X-Trace-ID,token', 'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS', }); }); diff --git a/lib/base/webserver/impl/ShelfWebServer_none.dart b/lib/base/webserver/impl/ShelfWebServer_none.dart index b45fe3e..41b0168 100644 --- a/lib/base/webserver/impl/ShelfWebServer_none.dart +++ b/lib/base/webserver/impl/ShelfWebServer_none.dart @@ -6,7 +6,7 @@ class ShelfWebServer implements WebServer { final String tag = "webserver"; ShelfWebServer(this.logger); @override - void start(int port) async {} + void start(int port, {Function? interceptor}) async {} @override void stop() {} diff --git a/pubspec.yaml b/pubspec.yaml index c600927..a80ed51 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -21,6 +21,10 @@ dependencies: redis: ^4.0.0 dart_jsonwebtoken: ^2.14.2 web_socket_channel: ^3.0.1 + source_gen: ^1.5.0 + build: ^2.4.1 + analyzer: ^6.8.0 + event_bus: ^2.0.1 dev_dependencies: lints: ^4.0.0