From fd62ed3d98b63ba7e7be1e2a6342faaba81400b8 Mon Sep 17 00:00:00 2001 From: qmqz Date: Thu, 2 Jan 2025 10:08:03 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A6=96=E6=AC=A1=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 7 + example/easy_dart_module_example.dart | 83 ++++++++ lib/EasyDartModule.dart | 59 ++++++ lib/base/database/DataBase.dart | 36 ++++ lib/base/database/impl/MongoDb.dart | 51 +++++ lib/base/discovery/Discovery.dart | 44 ++++ lib/base/discovery/impl/NacosDiscovery.dart | 211 +++++++++++++++++++ lib/base/http/TraceDio.dart | 119 +++++++++++ lib/base/logger/Logger.dart | 21 ++ lib/base/logger/impl/LokiLogger.dart | 72 +++++++ lib/base/mqtt/mqtt.dart | 82 ++++++++ lib/base/storage/Storage.dart | 36 ++++ lib/base/storage/impl/MinIoStorage.dart | 54 +++++ lib/base/webserver/WebServer.dart | 32 +++ lib/base/webserver/impl/ShelfWebServer.dart | 212 ++++++++++++++++++++ pubspec.yaml | 24 +++ 16 files changed, 1143 insertions(+) create mode 100644 .gitignore create mode 100644 example/easy_dart_module_example.dart create mode 100644 lib/EasyDartModule.dart create mode 100644 lib/base/database/DataBase.dart create mode 100644 lib/base/database/impl/MongoDb.dart create mode 100644 lib/base/discovery/Discovery.dart create mode 100644 lib/base/discovery/impl/NacosDiscovery.dart create mode 100644 lib/base/http/TraceDio.dart create mode 100644 lib/base/logger/Logger.dart create mode 100644 lib/base/logger/impl/LokiLogger.dart create mode 100644 lib/base/mqtt/mqtt.dart create mode 100644 lib/base/storage/Storage.dart create mode 100644 lib/base/storage/impl/MinIoStorage.dart create mode 100644 lib/base/webserver/WebServer.dart create mode 100644 lib/base/webserver/impl/ShelfWebServer.dart create mode 100644 pubspec.yaml diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3cceda5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +# https://dart.dev/guides/libraries/private-files +# Created by `dart pub` +.dart_tool/ + +# Avoid committing pubspec.lock for library packages; see +# https://dart.dev/guides/libraries/private-files#pubspeclock. +pubspec.lock diff --git a/example/easy_dart_module_example.dart b/example/easy_dart_module_example.dart new file mode 100644 index 0000000..f5b0039 --- /dev/null +++ b/example/easy_dart_module_example.dart @@ -0,0 +1,83 @@ +import 'dart:convert'; + +import 'package:EasyDartModule/EasyDartModule.dart'; +import 'package:EasyDartModule/base/database/DataBase.dart'; +import 'package:EasyDartModule/base/discovery/Discovery.dart'; +import 'package:EasyDartModule/base/logger/Logger.dart'; +import 'package:EasyDartModule/base/storage/Storage.dart'; +import 'package:EasyDartModule/base/webserver/WebServer.dart'; + +void main() async { + //初始化服务发现 + EasyDartModule.init( + discoveryConfig: DiscoveryConfig( + host: "http://10.20.1.2:8848", + namespaceId: "d3b43bfe-f584-4b8f-a390-353abc69c856")); + String ip = "10.20.0.80"; + int port = 9100; + String sn = "test-server"; + //注册实例 + await EasyDartModule.discovery.registerInstance(sn, ip, port); + + //查询日志服务配置信息 + String logger = await EasyDartModule.discovery.getConfig("logger"); + var loggerConfig = jsonDecode(logger); + //查询数据库配置信息 + String mongodb = await EasyDartModule.discovery.getConfig("mongodb"); + var mongodbConfig = jsonDecode(mongodb); + //查询存储配置 + String storage = await EasyDartModule.discovery.getConfig("storage"); + var storageConfig = jsonDecode(storage); + + EasyDartModule.init( + loggerConfig: LoggerConfig(host: loggerConfig["host"], serviceName: sn), + dataBaseConfig: DataBaseConfig( + host: mongodbConfig["host"], + userName: mongodbConfig["userName"], + password: mongodbConfig["password"], + dataBase: mongodbConfig["dataBase"]), + storageConfig: StorageConfig( + host: storageConfig["host"], + port: storageConfig["port"], + accessKey: storageConfig["accessKey"], + secretKey: storageConfig["secretKey"])); + + EasyDartModule.webServer.addHandler(TestController()); + EasyDartModule.webServer.start(port); + + //测试db + Future.delayed(Duration(seconds: 5), () { + var db = EasyDartModule.dataBase; + var r = db.query("uc_sys_user"); + print(r); + }); + + //测试minio + String bucketName = "test-bu"; + String objectName = "/aa/bb/objectName.gif"; + // EasyDartModule.storage.createBucket(bucketName).then((v) { + // EasyDartModule.storage + // .uploadObject( + // bucketName, + // objectName, + // File("F:\\qq\\11603720\\Image\\Group2\\\$C\\WL\\\$CWLJBX}@Z7E487L2J4MH`G.gif") + // .readAsBytesSync()) + // .then((path) { + // print("上传文件: $path"); + // EasyDartModule.storage.getObject(bucketName, objectName).then((data) { + // print("下载文件"); + // File("a.gif").writeAsBytesSync(data); + // print("下载完毕"); + // }); + // }); + // // EasyDartModule.storage.deleteObject(bucketName, "objectName"); + // }); +} + +@RequestMapping(path: "/test") +class TestController { + @RequestMapping(path: "/tt", method: HttpMethod.GET) + Response test(Request request) { + return Response(200, body: "ok"); + } +} diff --git a/lib/EasyDartModule.dart b/lib/EasyDartModule.dart new file mode 100644 index 0000000..6282c2a --- /dev/null +++ b/lib/EasyDartModule.dart @@ -0,0 +1,59 @@ +library EasyDartModule; + +import 'package:EasyDartModule/base/database/DataBase.dart'; +import 'package:EasyDartModule/base/database/impl/MongoDb.dart'; +import 'package:EasyDartModule/base/discovery/Discovery.dart'; +import 'package:EasyDartModule/base/discovery/impl/NacosDiscovery.dart'; +import 'package:EasyDartModule/base/http/TraceDio.dart'; +import 'package:EasyDartModule/base/logger/Logger.dart'; +import 'package:EasyDartModule/base/logger/impl/LokiLogger.dart'; +import 'package:EasyDartModule/base/mqtt/mqtt.dart'; +import 'package:EasyDartModule/base/storage/Storage.dart'; +import 'package:EasyDartModule/base/storage/impl/MinIoStorage.dart'; +import 'package:EasyDartModule/base/webserver/WebServer.dart'; +import 'package:EasyDartModule/base/webserver/impl/ShelfWebServer.dart'; + +export 'package:shelf/shelf.dart'; +export 'package:mongo_dart/mongo_dart.dart'; + +class EasyDartModule { + static Discovery get discovery => Discovery.getInstance(); + static DataBase get dataBase => DataBase.getInstance(); + static WebServer get webServer => WebServer.getInstance(); + static Logger get logger => Logger.getInstance(); + static TraceDio get dio => TraceDio.getInstance(); + static Mqtt get mqtt => Mqtt.getInstance(); + static Storage get storage => Storage.getInstance(); + static bool init( + {DiscoveryConfig? discoveryConfig, + DataBaseConfig? dataBaseConfig, + LoggerConfig? loggerConfig, + MqttConfig? mqttConfig, + StorageConfig? storageConfig}) { + if (discoveryConfig != null) { + //nacos注册配置中心 + Discovery.setInstance(NacosDiscovery(discoveryConfig)); + } + if (dataBaseConfig != null) { + //mongo数据库 + DataBase.setInstance(MongoDb(dataBaseConfig)); + } + if (mqttConfig != null) { + //mqtt + Mqtt.setInstance(Mqtt(mqttConfig)); + } + if (storageConfig != null) { + //s3存储 + Storage.setInstance(MinioStorage(storageConfig)); + } + if (loggerConfig != null) { + //初始化日志 + Logger.setInstance(LokiLogger(loggerConfig)); + //web服务器 + WebServer.setInstance(ShelfWebServer(logger)); + //Dio组件 + TraceDio.setInstance(TraceDio(logger)); + } + return true; + } +} diff --git a/lib/base/database/DataBase.dart b/lib/base/database/DataBase.dart new file mode 100644 index 0000000..9326fac --- /dev/null +++ b/lib/base/database/DataBase.dart @@ -0,0 +1,36 @@ +abstract class DataBase { + static late DataBase _dataBase; + + static DataBase getInstance() { + return _dataBase; + } + + static void setInstance(DataBase database) { + _dataBase = database; + } + + // 执行查询操作 + Future>> query(String table, {dynamic condition}); + + // 执行插入操作 + Future insert(String table, Map data); + + // 执行更新操作 + Future update( + String table, Map data, dynamic condition); + + // 执行删除操作 + Future delete(String table, dynamic condition); +} + +class DataBaseConfig { + String host; + String userName; + String password; + String dataBase; + DataBaseConfig( + {required this.host, + required this.userName, + required this.password, + required this.dataBase}); +} diff --git a/lib/base/database/impl/MongoDb.dart b/lib/base/database/impl/MongoDb.dart new file mode 100644 index 0000000..2d6209e --- /dev/null +++ b/lib/base/database/impl/MongoDb.dart @@ -0,0 +1,51 @@ +import 'package:EasyDartModule/base/database/DataBase.dart'; +import 'package:mongo_dart/mongo_dart.dart'; + +class MongoDb implements DataBase { + final DataBaseConfig config; + final Db db; + MongoDb(this.config) + : db = Db( + "mongodb://${config.userName}:${config.password}@${config.host}/${config.dataBase}?authSource=admin") { + Future.delayed(Duration(seconds: 1), () async { + do { + try { + await db.open(); + print('Connected successfully!'); + } catch (e) { + print('Connection error: $e'); + await Future.delayed(Duration(seconds: 1)); + } + } while (!db.isConnected); + }); + } + + DbCollection getCollection(String name) { + return db.collection(name); + } + + @override + Future delete(String table, dynamic condition) async { + await getCollection(table).deleteMany(condition); + } + + @override + Future insert(String table, Map data) async { + await getCollection(table).insert(data); + } + + @override + Future>> query(String table, + {dynamic condition}) async { + if (condition == null) { + return await getCollection(table).find().toList(); + } + return await getCollection(table).find(condition).toList(); + } + + @override + Future update( + String table, Map data, dynamic condition) async { + await getCollection(table).update(condition, data); + } +} diff --git a/lib/base/discovery/Discovery.dart b/lib/base/discovery/Discovery.dart new file mode 100644 index 0000000..b63debe --- /dev/null +++ b/lib/base/discovery/Discovery.dart @@ -0,0 +1,44 @@ +abstract class Discovery { + static late Discovery _discovery; + + static Discovery getInstance() { + return _discovery; + } + + static void setInstance(Discovery discovery) { + _discovery = discovery; + } + + // 注册实例到 Nacos + Future registerInstance(String serviceName, String ip, int port, + {String groupName = 'DEFAULT_GROUP'}); + + // 注销实例 + Future deRegisterInstance(String serviceName, String ip, int port, + {String groupName = 'DEFAULT_GROUP'}); + + // 获取服务实例列表 + Future>> getInstanceList(String serviceName, + {String groupName = 'DEFAULT_GROUP'}); + + // 获取配置 + Future getConfig(String dataId, {String group = 'DEFAULT_GROUP'}); + + //获取配置历史记录 + + // 发布配置 + Future publishConfig(String dataId, String group, String content); + + // 删除配置 + Future deleteConfig(String dataId, String group); +} + +class DiscoveryConfig { + String host; + String namespaceId; + String groupName; + DiscoveryConfig( + {required this.host, + required this.namespaceId, + this.groupName = "DEFAULT_GROUP"}); +} diff --git a/lib/base/discovery/impl/NacosDiscovery.dart b/lib/base/discovery/impl/NacosDiscovery.dart new file mode 100644 index 0000000..696d68b --- /dev/null +++ b/lib/base/discovery/impl/NacosDiscovery.dart @@ -0,0 +1,211 @@ +import 'dart:async'; + +import 'package:dio/dio.dart'; +import 'package:EasyDartModule/base/discovery/Discovery.dart'; + +class NacosDiscovery implements Discovery { + // final String host; + // final String namespaceId; + final DiscoveryConfig config; + final Dio dio; + bool healthCheck = true; + + NacosDiscovery(this.config) : dio = Dio(BaseOptions(baseUrl: config.host)); + + // 注册实例到 Nacos + @override + Future registerInstance(String serviceName, String ip, int port, + {String groupName = 'DEFAULT_GROUP'}) async { + try { + final response = await dio.post( + '/nacos/v2/ns/instance', + queryParameters: { + 'serviceName': serviceName, + 'ip': ip, + 'port': port, + 'groupName': groupName, + 'namespaceId': config.namespaceId, + 'ephemeral': true + }, + ); + if (response.statusCode == 200) { + print('服务注册成功: $serviceName'); + healthCheck = true; + //启动定时器 更新服务健康状态 + Future.doWhile(() async { + if (healthCheck) { + await Future.delayed(Duration(seconds: 5), () async { + //发送心跳包 + try { + final rr = await dio + .put("/nacos/v1/ns/instance/beat", queryParameters: { + 'serviceName': serviceName, + 'ip': ip, + 'port': port, + 'groupName': groupName, + 'namespaceId': config.namespaceId, + }); + print(rr); + } catch (e) { + print(e); + } + }); + } + return healthCheck; + }).then((v) { + print("心跳线程结束"); + }); + // Timer.periodic(Duration(seconds: 5), (timer) async { + // //判断是否取消注册 + // if (!healthCheck) { + // timer.cancel(); + // return; + // } + // }); + return true; + } else { + print('服务注册失败: ${response.statusCode} - ${response.data}'); + return false; + } + } catch (e) { + print('请求失败: $e'); + return false; + } + } + + // 注销实例 + @override + Future deRegisterInstance(String serviceName, String ip, int port, + {String groupName = 'DEFAULT_GROUP'}) async { + try { + final response = await dio.delete( + '/nacos/v2/ns/instance', + queryParameters: { + 'serviceName': serviceName, + 'ip': ip, + 'port': port, + 'groupName': groupName, + 'namespaceId': config.namespaceId, + }, + ); + if (response.statusCode == 200) { + print('服务注销成功: $serviceName'); + //标记心跳线程为退出状态 + healthCheck = false; + return true; + } else { + print('服务注销失败: ${response.statusCode} - ${response.data}'); + return false; + } + } catch (e) { + print('请求失败: $e'); + return false; + } + } + + // 获取服务实例列表 + @override + Future>> getInstanceList(String serviceName, + {String groupName = 'DEFAULT_GROUP'}) async { + try { + final response = await dio.get( + '/nacos/v2/ns/instance/list', + queryParameters: { + 'serviceName': serviceName, + 'groupName': groupName, + 'namespaceId': config.namespaceId, + 'healthyOnly': true, + }, + ); + if (response.statusCode == 200) { + final instances = response.data["data"]['hosts']; + // print('服务实例列表: $instances'); + return List>.from(instances); + } else { + print('获取服务实例列表失败: ${response.statusCode} - ${response.data}'); + return []; + } + } catch (e) { + print('请求失败: $e'); + return []; + } + } + + // 获取配置 + @override + Future getConfig(String dataId, + {String group = 'DEFAULT_GROUP'}) async { + try { + final response = await dio.get( + '/nacos/v2/cs/config', + queryParameters: { + 'dataId': dataId, + 'group': group, + 'namespaceId': config.namespaceId, + }, + ); + if (response.statusCode == 200) { + // print('获取配置成功: ${response.data}'); + return response.data["data"]; + } else { + print('获取配置失败: ${response.statusCode} - ${response.data}'); + return ''; + } + } catch (e) { + print('请求失败: $e'); + return ''; + } + } + + // 发布配置 + @override + Future publishConfig( + String dataId, String group, String content) async { + try { + final response = await dio.post( + '/nacos/v2/cs/config', + queryParameters: { + 'dataId': dataId, + 'group': group, + 'namespaceId': config.namespaceId, + 'content': content, + }, + ); + if (response.statusCode == 200) { + print('配置发布成功: $dataId'); + return true; + } else { + print('配置发布失败: ${response.statusCode} - ${response.data}'); + return false; + } + } catch (e) { + print('请求失败: $e'); + return false; + } + } + + // 删除配置 + @override + Future deleteConfig(String dataId, String group) async { + try { + final response = await dio.delete( + '/nacos/v2/cs/config', + queryParameters: { + 'dataId': dataId, + 'group': group, + 'namespaceId': config.namespaceId, + }, + ); + if (response.statusCode == 200) { + print('配置删除成功: $dataId'); + return true; + } else { + print('配置删除失败: ${response.statusCode} - ${response.data}'); + return false; + } + } catch (e) { + print('请求失败: $e'); + return false; + } + } +} diff --git a/lib/base/http/TraceDio.dart b/lib/base/http/TraceDio.dart new file mode 100644 index 0000000..1bfe12a --- /dev/null +++ b/lib/base/http/TraceDio.dart @@ -0,0 +1,119 @@ +import 'package:EasyDartModule/base/logger/Logger.dart'; +import 'package:dio/dio.dart'; +import 'package:shelf/shelf.dart' as sf; + +class TraceDio { + final Dio _dio; + final Logger _logger; + + static late TraceDio _traceDio; + + TraceDio(Logger logger) + : _dio = Dio(), + _logger = logger { + // 配置 Dio + // 设置连接超时 + _dio.options.connectTimeout = Duration(seconds: 5); + // 设置接收超时 + _dio.options.receiveTimeout = Duration(seconds: 5); + //保留原始大小写 + _dio.options.preserveHeaderCase = true; + + // 设置拦截器,自动添加 traceId 和 spanId,并记录日志 + String traceId = "none"; + String spanId = "none"; + _dio.interceptors.add(InterceptorsWrapper( + onRequest: (options, handler) { + // 获取请求中的 traceId(如果没有则生成) + traceId = options.headers['X-Trace-ID']; + spanId = options.headers['X-Span-ID']; + + // 在请求头中添加 traceId 和 spanId + // options.headers['X-Trace-ID'] = traceId; + // options.headers['X-Span-ID'] = spanId; + + // 记录请求日志 + _logger.info( + 'traceId=$traceId, spanId=$spanId Sending request: ${options.method} ${options.uri}', + tag: "DIO"); + return handler.next(options); // 继续请求 + }, + onResponse: (response, handler) { + // 记录响应日志 + _logger.info( + 'traceId=$traceId, spanId=$spanId Response received: ${response.statusCode} ${response.statusMessage}', + tag: "DIO"); + return handler.next(response); // 继续处理响应 + }, + onError: (DioException e, handler) { + // 记录错误日志 + _logger.error( + 'traceId=$traceId, spanId=$spanId Request failed: ${e.message}', + tag: "DIO"); + return handler.next(e); // 继续处理错误 + }, + )); + } + + static TraceDio getInstance() { + return _traceDio; + } + + static void setInstance(TraceDio traceDio) { + _traceDio = traceDio; + } + + Map? getHeader( + {Map? headers, sf.Request? request}) { + if (request != null) { + return { + "X-Trace-ID": request.context['request_trace_id'] as String, + "X-Span-ID": request.context['request_span_id'] as String + }; + } + return null; + } + + // 发起 GET 请求 + Future get(String url, + {Map? queryParameters, sf.Request? request}) async { + return await _dio.get(url, + queryParameters: queryParameters, + options: Options(headers: getHeader(request: request))); + } + + // 发起 POST 请求 + Future post(String url, + {Object? data, + Map? queryParameters, + sf.Request? request}) async { + return await _dio.post(url, + data: data, + queryParameters: queryParameters, + options: Options(headers: getHeader(request: request))); + } + + // 发起 PUT 请求 + Future put(String url, + {Object? data, + Map? queryParameters, + sf.Request? request}) async { + return await _dio.put(url, + data: data, + queryParameters: queryParameters, + options: Options(headers: getHeader(request: request))); + } + +// 发起 DELETE 请求 + Future delete(String url, + {Object? data, + Map? queryParameters, + sf.Request? request}) async { + return await _dio.delete( + url, + data: data, + queryParameters: queryParameters, + options: Options(headers: getHeader(request: request)), + ); + } +} diff --git a/lib/base/logger/Logger.dart b/lib/base/logger/Logger.dart new file mode 100644 index 0000000..614feaa --- /dev/null +++ b/lib/base/logger/Logger.dart @@ -0,0 +1,21 @@ +abstract class Logger { + static late Logger _logger; + + static Logger getInstance() { + return _logger; + } + + static void setInstance(Logger logger) { + _logger = logger; + } + + void info(String msg, {String tag}); + void warning(String msg, {String tag}); + void error(String msg, {String tag}); +} + +class LoggerConfig { + String host; + String serviceName; + LoggerConfig({required this.host, required this.serviceName}); +} diff --git a/lib/base/logger/impl/LokiLogger.dart b/lib/base/logger/impl/LokiLogger.dart new file mode 100644 index 0000000..afe8453 --- /dev/null +++ b/lib/base/logger/impl/LokiLogger.dart @@ -0,0 +1,72 @@ +import 'dart:convert'; +import 'dart:io'; + +import 'package:EasyDartModule/base/logger/Logger.dart'; +import 'package:dio/dio.dart'; + +enum LoggerLevel { + debug(1), + info(2), + warning(3), + error(4), + off(5), + ; + + final int level; + const LoggerLevel(this.level); +} + +class LokiLogger implements Logger { + final LoggerConfig? _config; + final Dio dio; + LoggerLevel level = LoggerLevel.info; + LokiLogger(this._config) + : dio = Dio(BaseOptions( + baseUrl: _config == null ? "" : _config.host, + headers: { + "Content-Type": "application/json", + "Content-Encoding": "gzip" + })); + + @override + void info(String msg, {String? tag}) { + log(msg, level: LoggerLevel.info, tag: tag); + } + + @override + void warning(String msg, {String? tag}) { + log(msg, level: LoggerLevel.warning, tag: tag); + } + + @override + void error(String msg, {String? tag}) { + log(msg, level: LoggerLevel.error, tag: tag); + } + + void log(String msg, {required LoggerLevel level, String? tag}) { + if (level.level < this.level.level) { + //日志等级小于设置的输出日志等级 + return; + } + if (_config == null) { + print("$tag $level $msg"); + } else { + //推送到loki服务器 + //{_config.url} + var now = DateTime.now(); + // 转换为纳秒 + int nanoseconds = now.microsecondsSinceEpoch * 1000; + var zip = gzip.encode(utf8.encode(jsonEncode({ + "streams": [ + { + "stream": {"service_name": _config.serviceName}, + "values": [ + [nanoseconds.toString(), "$tag ${level.name.toUpperCase()} $msg"] + ] + } + ] + }))); + dio.post("/loki/api/v1/push", data: zip); + } + } +} diff --git a/lib/base/mqtt/mqtt.dart b/lib/base/mqtt/mqtt.dart new file mode 100644 index 0000000..35b93e1 --- /dev/null +++ b/lib/base/mqtt/mqtt.dart @@ -0,0 +1,82 @@ +import 'package:mqtt5_client/mqtt5_client.dart'; +import 'package:mqtt5_client/mqtt5_server_client.dart'; + +class Mqtt { + final MqttConfig _config; + MqttClient? _client; + + Mqtt(this._config); + + static late Mqtt _mqtt; + + static Mqtt getInstance() { + return _mqtt; + } + + static void setInstance(Mqtt server) { + _mqtt = server; + } + + Future connect() async { + if (_client != null) { + return true; + } + _client = + MqttServerClient.withPort(_config.host, _config.clientId, _config.port); + _client?.autoReconnect = true; + + await _client?.connect(_config.username, _config.password); + _client?.updates.listen((List> message) { + final recMess = message[0].payload as MqttPublishMessage; + final payload = + MqttUtilities.bytesToStringAsString(recMess.payload.message!); + _config.messgae(message[0].topic!, payload); + }); + _config.topic?.forEach((topic) { + subscribe(topic, _config.qos); + }); + + return true; + } + + void disconnect() { + _client?.disconnect(); + _client = null; + } + + void subscribe(String topic, int qos) { + _client?.subscribe(topic, MqttUtilities.getQosLevel(qos)); + } + + void unSubscribe(String topic) { + _client?.unsubscribeStringTopic(topic); + } + + void publish(String topic, String msg, {int qos = 0}) { + var payload = MqttPayloadBuilder(); + payload.addString(msg); + _client?.publishMessage( + topic, MqttUtilities.getQosLevel(qos), payload.payload!); + } +} + +class MqttConfig { + final String host; + final int port; + final String clientId; + String? username; + String? password; + List? topic; + int qos; + Function(String topic, String message) messgae; + + MqttConfig( + {required this.host, + this.port = 1883, + required this.clientId, + required this.messgae, + this.topic, + this.qos = 0, + this.username, + this.password}); +} diff --git a/lib/base/storage/Storage.dart b/lib/base/storage/Storage.dart new file mode 100644 index 0000000..72270df --- /dev/null +++ b/lib/base/storage/Storage.dart @@ -0,0 +1,36 @@ +import 'dart:typed_data'; + +abstract class Storage { + static late Storage _storage; + + static Storage getInstance() { + return _storage; + } + + static void setInstance(Storage server) { + _storage = server; + } + + Future createBucket(String name); + Future removeBucket(String name); + + Future uploadObject( + String bucketName, String objectName, Uint8List data); + Future getObject(String bucketName, String objectName); + Future deleteObject(String bucketName, String objectName); +} + +class StorageConfig { + final String host; + final int port; + final bool ssl; + final String accessKey; + final String secretKey; + + StorageConfig( + {required this.host, + required this.port, + this.ssl = false, + required this.accessKey, + required this.secretKey}); +} diff --git a/lib/base/storage/impl/MinIoStorage.dart b/lib/base/storage/impl/MinIoStorage.dart new file mode 100644 index 0000000..962a254 --- /dev/null +++ b/lib/base/storage/impl/MinIoStorage.dart @@ -0,0 +1,54 @@ +import 'dart:async'; +import 'dart:typed_data'; + +import 'package:EasyDartModule/base/storage/Storage.dart'; +import 'package:minio/minio.dart'; + +class MinioStorage implements Storage { + final Minio _minio; + final StorageConfig _config; + MinioStorage(this._config) + : _minio = Minio( + endPoint: _config.host, + port: _config.port, + useSSL: _config.ssl, + accessKey: _config.accessKey, + secretKey: _config.secretKey); + + @override + Future createBucket(String name) async { + if (await _minio.bucketExists(name)) { + return; + } + return await _minio.makeBucket(name); + } + + @override + Future removeBucket(String name) async { + return await _minio.removeBucket(name); + } + + @override + Future deleteObject(String bucketName, String objectName) async { + return await _minio.removeObject(bucketName, objectName); + } + + @override + Future getObject(String bucketName, String objectName) async { + MinioByteStream stream = await _minio.getObject(bucketName, objectName); + // 将 Stream> 转为 List + final List bytes = await stream + .toList() + .then((chunks) => chunks.expand((chunk) => chunk).toList()); + + // 转换为 Uint8List + return Uint8List.fromList(bytes); + } + + @override + Future uploadObject( + String bucketName, String objectName, Uint8List data) async { + await _minio.putObject(bucketName, objectName, Stream.fromIterable([data])); + return "http${_config.ssl ? "s" : ""}://${_config.host}:${_config.port}/$bucketName/$objectName"; + } +} diff --git a/lib/base/webserver/WebServer.dart b/lib/base/webserver/WebServer.dart new file mode 100644 index 0000000..8df7b82 --- /dev/null +++ b/lib/base/webserver/WebServer.dart @@ -0,0 +1,32 @@ +abstract class WebServer { + static late WebServer _webServer; + + static WebServer getInstance() { + return _webServer; + } + + static void setInstance(WebServer server) { + _webServer = server; + } + + void start(int port); + void stop(); + void addHandler(handler); +} + +enum HttpMethod { + GET, + POST, + PUT, + DELETE, + ALL, + WS, + ; +} + +class RequestMapping { + final HttpMethod method; + final String path; + + const RequestMapping({this.method = HttpMethod.ALL, required this.path}); +} diff --git a/lib/base/webserver/impl/ShelfWebServer.dart b/lib/base/webserver/impl/ShelfWebServer.dart new file mode 100644 index 0000000..d1a6162 --- /dev/null +++ b/lib/base/webserver/impl/ShelfWebServer.dart @@ -0,0 +1,212 @@ +import 'dart:io'; +import 'dart:mirrors'; + +import 'package:EasyDartModule/EasyDartModule.dart'; +import 'package:EasyDartModule/base/logger/Logger.dart'; +import 'package:EasyDartModule/base/webserver/WebServer.dart'; +import 'package:shelf/shelf.dart'; +import 'package:shelf/shelf_io.dart'; +import 'package:shelf_router/shelf_router.dart'; +import 'package:shelf_web_socket/shelf_web_socket.dart'; +import 'package:uuid/uuid.dart'; + +class ShelfWebServer implements WebServer { + late Logger logger; + HttpServer? _server; + final Router _router = Router(); + final Uuid uuid = Uuid(); + final Map _wsCall = {}; + + ShelfWebServer(this.logger); + final String tag = "webserver"; + + // 中间件:记录请求日志 + Middleware logRequests() { + return (Handler innerHandler) { + return (Request request) async { + final requestId = request.context['request_trace_id'] as String; + final requestSpanId = request.context['request_span_id'] as String; + final requestParentSpanId = + request.context['request_parent_span_id'] as String?; + final String logPer = + "traceId=$requestId spanId=$requestSpanId parentSpanId=$requestParentSpanId"; + logger.info('$logPer | 请求路径: ${request.requestedUri}', tag: tag); + // final stopwatch = Stopwatch()..start(); + final stopwatch = request.context["request_stop_watch"] as Stopwatch; + stopwatch.start(); + Response response; + try { + response = await innerHandler(request); + stopwatch.stop(); + logger.info( + '$logPer | 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms', + tag: tag); + } catch (e, s) { + if (e is HijackException) { + //不能处理该异常直接抛出 + throw e; + } + stopwatch.stop(); + logger.error("$logPer | 服务器错误 | ${e.toString()} ${s.toString()}", + tag: tag); + response = Response(500, + body: "Internal Server Error\r\nTraceId:$requestId"); + } + return response; + }; + }; + } + +// 中间件:为每个请求生成唯一的请求ID + Middleware requestIdMiddleware() { + return (Handler innerHandler) { + return (Request request) async { + // 获取请求头中的 X-Request-ID,如果没有则生成一个 + final requestId = request.headers['X-Trace-ID'] ?? uuid.v4(); + String? spanId = request.headers['X-Span-ID']; + String? parentSpanId; + if (spanId != null) { + parentSpanId = spanId; + } + spanId = uuid.v4(); + final updatedRequest = request.change(context: { + 'request_trace_id': requestId, + "request_span_id": spanId, + "request_parent_span_id": parentSpanId, + "request_stop_watch": Stopwatch() + }); + return innerHandler(updatedRequest); + }; + }; + } + + Future _routerHandler(Request request) async { + String path = request.url.path; + if (_wsCall.containsKey(path)) { + final String spanId = request.context['request_span_id'] as String; + final String requestId = request.context['request_trace_id'] as String; + final String? requestParentSpanId = + request.context['request_parent_span_id'] as String?; + return await webSocketHandler((channel) { + _wsCall[path]!.chanelMap[spanId] = channel; + _wsCall[path]!.open(spanId); + // 监听 WebSocket 消息 + channel.stream.listen((message) { + print('Received message: $message'); + // 处理消息并回应客户端 + // channel.sink.add(message); + _wsCall[path]!.message(spanId, message); + }, onDone: () { + try { + _wsCall[path]!.close(spanId); + _wsCall[path]!.chanelMap.remove(spanId); + } catch (e) { + print(e); + } + + final stopwatch = request.context["request_stop_watch"] as Stopwatch; + stopwatch.stop(); + logger.info( + 'traceId=$requestId spanId=$spanId parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms', + tag: "webserver"); + }, onError: (error) { + logger.info( + 'traceId=$requestId spanId=$spanId parentSpanId=$requestParentSpanId | WebSocket error: $error', + tag: "webserver"); + }); + })(request); + } else { + return await _router.call(request); + } + } + + @override + void start(int port) async { + //反射获取全部路由地址 + final handler = Pipeline() + .addMiddleware(requestIdMiddleware()) + .addMiddleware(logRequests()) + .addHandler(_routerHandler); + _server = await serve(handler, InternetAddress.anyIPv4, port); + print('Server listening on port ${_server?.port}'); + } + + @override + void stop() { + _server?.close(); + } + + @override + void addHandler(handler) { + ClassMirror cm = reflectClass(handler.runtimeType); + var im = reflect(handler); + String path = ""; + for (var metadata in cm.metadata) { + // 检查元数据是否为RequestMapping类型 + if (metadata.reflectee is RequestMapping) { + // 获取实例 + RequestMapping annotation = metadata.reflectee; + path = annotation.path; + if (path.endsWith("/")) { + path = path.substring(0, path.length - 1); + } + if (annotation.method == HttpMethod.WS) { + //wwebsocket处理句柄直接加入路由 + addRouter(annotation.method, path, handler); + return; + } + break; + } + } + + im.type.declarations.forEach((k, v) { + if (v is MethodMirror) { + for (var m in v.metadata) { + if (m.reflectee is RequestMapping) { + var mp = m.reflectee.path as String; + + if (mp.startsWith("/")) { + mp = mp.substring(1, mp.length); + } + + String p = "$path/$mp"; + // print("method: ${m.reflectee.method} $p"); + //把地址加入路由 + + addRouter(m.reflectee.method, p, (req, [a, b, c, d, e, f]) { + return im.invoke(v.simpleName, [req]).reflectee; + }); + } + } + } + }); + } + + void addRouter(HttpMethod method, String path, dynamic handler) { + if (method == HttpMethod.WS) { + //添加websocket连接处理函数 + if (handler is WebSocketHandler) { + _wsCall[path] = handler; + } else { + print("回调函数类型错误 需要 WebSocketHandler "); + } + } else { + _router.add(method.name, path, handler); + } + } +} + +abstract class WebSocketHandler { + Map chanelMap = {}; + void open(String id); + void close(String id); + void message(String id, dynamic message); + + void sendData(String id, dynamic data) { + if (chanelMap.containsKey(id)) { + chanelMap[id].sink.add(data); + } else { + print("id未找到"); + } + } +} diff --git a/pubspec.yaml b/pubspec.yaml new file mode 100644 index 0000000..cb2e954 --- /dev/null +++ b/pubspec.yaml @@ -0,0 +1,24 @@ +name: EasyDartModule +description: A starting point for Dart libraries or applications. +version: 1.0.0 +# repository: https://github.com/my_org/my_repo + +environment: + sdk: ^3.5.4 + +# Add regular dependencies here. +dependencies: + # path: ^1.8.0 + grpc: ^4.0.1 + dio: ^5.0.0 + mongo_dart: ^0.10.3 + shelf: ^1.4.2 + shelf_router: ^1.1.4 + shelf_web_socket: ^2.0.1 + uuid: ^4.5.1 + mqtt5_client: ^4.6.2 + minio: ^3.5.7 + +dev_dependencies: + lints: ^4.0.0 + test: ^1.24.0