首次提交

This commit is contained in:
qmqz
2025-01-02 10:08:03 +08:00
commit fd62ed3d98
16 changed files with 1143 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@@ -0,0 +1,7 @@
# https://dart.dev/guides/libraries/private-files
# Created by `dart pub`
.dart_tool/
# Avoid committing pubspec.lock for library packages; see
# https://dart.dev/guides/libraries/private-files#pubspeclock.
pubspec.lock

View File

@@ -0,0 +1,83 @@
import 'dart:convert';
import 'package:EasyDartModule/EasyDartModule.dart';
import 'package:EasyDartModule/base/database/DataBase.dart';
import 'package:EasyDartModule/base/discovery/Discovery.dart';
import 'package:EasyDartModule/base/logger/Logger.dart';
import 'package:EasyDartModule/base/storage/Storage.dart';
import 'package:EasyDartModule/base/webserver/WebServer.dart';
void main() async {
//初始化服务发现
EasyDartModule.init(
discoveryConfig: DiscoveryConfig(
host: "http://10.20.1.2:8848",
namespaceId: "d3b43bfe-f584-4b8f-a390-353abc69c856"));
String ip = "10.20.0.80";
int port = 9100;
String sn = "test-server";
//注册实例
await EasyDartModule.discovery.registerInstance(sn, ip, port);
//查询日志服务配置信息
String logger = await EasyDartModule.discovery.getConfig("logger");
var loggerConfig = jsonDecode(logger);
//查询数据库配置信息
String mongodb = await EasyDartModule.discovery.getConfig("mongodb");
var mongodbConfig = jsonDecode(mongodb);
//查询存储配置
String storage = await EasyDartModule.discovery.getConfig("storage");
var storageConfig = jsonDecode(storage);
EasyDartModule.init(
loggerConfig: LoggerConfig(host: loggerConfig["host"], serviceName: sn),
dataBaseConfig: DataBaseConfig(
host: mongodbConfig["host"],
userName: mongodbConfig["userName"],
password: mongodbConfig["password"],
dataBase: mongodbConfig["dataBase"]),
storageConfig: StorageConfig(
host: storageConfig["host"],
port: storageConfig["port"],
accessKey: storageConfig["accessKey"],
secretKey: storageConfig["secretKey"]));
EasyDartModule.webServer.addHandler(TestController());
EasyDartModule.webServer.start(port);
//测试db
Future.delayed(Duration(seconds: 5), () {
var db = EasyDartModule.dataBase;
var r = db.query("uc_sys_user");
print(r);
});
//测试minio
String bucketName = "test-bu";
String objectName = "/aa/bb/objectName.gif";
// EasyDartModule.storage.createBucket(bucketName).then((v) {
// EasyDartModule.storage
// .uploadObject(
// bucketName,
// objectName,
// File("F:\\qq\\11603720\\Image\\Group2\\\$C\\WL\\\$CWLJBX}@Z7E487L2J4MH`G.gif")
// .readAsBytesSync())
// .then((path) {
// print("上传文件: $path");
// EasyDartModule.storage.getObject(bucketName, objectName).then((data) {
// print("下载文件");
// File("a.gif").writeAsBytesSync(data);
// print("下载完毕");
// });
// });
// // EasyDartModule.storage.deleteObject(bucketName, "objectName");
// });
}
@RequestMapping(path: "/test")
class TestController {
@RequestMapping(path: "/tt", method: HttpMethod.GET)
Response test(Request request) {
return Response(200, body: "ok");
}
}

59
lib/EasyDartModule.dart Normal file
View File

@@ -0,0 +1,59 @@
library EasyDartModule;
import 'package:EasyDartModule/base/database/DataBase.dart';
import 'package:EasyDartModule/base/database/impl/MongoDb.dart';
import 'package:EasyDartModule/base/discovery/Discovery.dart';
import 'package:EasyDartModule/base/discovery/impl/NacosDiscovery.dart';
import 'package:EasyDartModule/base/http/TraceDio.dart';
import 'package:EasyDartModule/base/logger/Logger.dart';
import 'package:EasyDartModule/base/logger/impl/LokiLogger.dart';
import 'package:EasyDartModule/base/mqtt/mqtt.dart';
import 'package:EasyDartModule/base/storage/Storage.dart';
import 'package:EasyDartModule/base/storage/impl/MinIoStorage.dart';
import 'package:EasyDartModule/base/webserver/WebServer.dart';
import 'package:EasyDartModule/base/webserver/impl/ShelfWebServer.dart';
export 'package:shelf/shelf.dart';
export 'package:mongo_dart/mongo_dart.dart';
class EasyDartModule {
static Discovery get discovery => Discovery.getInstance();
static DataBase get dataBase => DataBase.getInstance();
static WebServer get webServer => WebServer.getInstance();
static Logger get logger => Logger.getInstance();
static TraceDio get dio => TraceDio.getInstance();
static Mqtt get mqtt => Mqtt.getInstance();
static Storage get storage => Storage.getInstance();
static bool init(
{DiscoveryConfig? discoveryConfig,
DataBaseConfig? dataBaseConfig,
LoggerConfig? loggerConfig,
MqttConfig? mqttConfig,
StorageConfig? storageConfig}) {
if (discoveryConfig != null) {
//nacos注册配置中心
Discovery.setInstance(NacosDiscovery(discoveryConfig));
}
if (dataBaseConfig != null) {
//mongo数据库
DataBase.setInstance(MongoDb(dataBaseConfig));
}
if (mqttConfig != null) {
//mqtt
Mqtt.setInstance(Mqtt(mqttConfig));
}
if (storageConfig != null) {
//s3存储
Storage.setInstance(MinioStorage(storageConfig));
}
if (loggerConfig != null) {
//初始化日志
Logger.setInstance(LokiLogger(loggerConfig));
//web服务器
WebServer.setInstance(ShelfWebServer(logger));
//Dio组件
TraceDio.setInstance(TraceDio(logger));
}
return true;
}
}

View File

@@ -0,0 +1,36 @@
abstract class DataBase {
static late DataBase _dataBase;
static DataBase getInstance() {
return _dataBase;
}
static void setInstance(DataBase database) {
_dataBase = database;
}
// 执行查询操作
Future<List<Map<String, dynamic>>> query(String table, {dynamic condition});
// 执行插入操作
Future<void> insert(String table, Map<String, dynamic> data);
// 执行更新操作
Future<void> update(
String table, Map<String, dynamic> data, dynamic condition);
// 执行删除操作
Future<void> delete(String table, dynamic condition);
}
class DataBaseConfig {
String host;
String userName;
String password;
String dataBase;
DataBaseConfig(
{required this.host,
required this.userName,
required this.password,
required this.dataBase});
}

View File

@@ -0,0 +1,51 @@
import 'package:EasyDartModule/base/database/DataBase.dart';
import 'package:mongo_dart/mongo_dart.dart';
class MongoDb implements DataBase {
final DataBaseConfig config;
final Db db;
MongoDb(this.config)
: db = Db(
"mongodb://${config.userName}:${config.password}@${config.host}/${config.dataBase}?authSource=admin") {
Future.delayed(Duration(seconds: 1), () async {
do {
try {
await db.open();
print('Connected successfully!');
} catch (e) {
print('Connection error: $e');
await Future.delayed(Duration(seconds: 1));
}
} while (!db.isConnected);
});
}
DbCollection getCollection(String name) {
return db.collection(name);
}
@override
Future<void> delete(String table, dynamic condition) async {
await getCollection(table).deleteMany(condition);
}
@override
Future<void> insert(String table, Map<String, dynamic> data) async {
await getCollection(table).insert(data);
}
@override
Future<List<Map<String, dynamic>>> query(String table,
{dynamic condition}) async {
if (condition == null) {
return await getCollection(table).find().toList();
}
return await getCollection(table).find(condition).toList();
}
@override
Future<void> update(
String table, Map<String, dynamic> data, dynamic condition) async {
await getCollection(table).update(condition, data);
}
}

View File

@@ -0,0 +1,44 @@
abstract class Discovery {
static late Discovery _discovery;
static Discovery getInstance() {
return _discovery;
}
static void setInstance(Discovery discovery) {
_discovery = discovery;
}
// 注册实例到 Nacos
Future<bool> registerInstance(String serviceName, String ip, int port,
{String groupName = 'DEFAULT_GROUP'});
// 注销实例
Future<bool> deRegisterInstance(String serviceName, String ip, int port,
{String groupName = 'DEFAULT_GROUP'});
// 获取服务实例列表
Future<List<Map<String, dynamic>>> getInstanceList(String serviceName,
{String groupName = 'DEFAULT_GROUP'});
// 获取配置
Future<String> getConfig(String dataId, {String group = 'DEFAULT_GROUP'});
//获取配置历史记录
// 发布配置
Future<bool> publishConfig(String dataId, String group, String content);
// 删除配置
Future<bool> deleteConfig(String dataId, String group);
}
class DiscoveryConfig {
String host;
String namespaceId;
String groupName;
DiscoveryConfig(
{required this.host,
required this.namespaceId,
this.groupName = "DEFAULT_GROUP"});
}

View File

@@ -0,0 +1,211 @@
import 'dart:async';
import 'package:dio/dio.dart';
import 'package:EasyDartModule/base/discovery/Discovery.dart';
class NacosDiscovery implements Discovery {
// final String host;
// final String namespaceId;
final DiscoveryConfig config;
final Dio dio;
bool healthCheck = true;
NacosDiscovery(this.config) : dio = Dio(BaseOptions(baseUrl: config.host));
// 注册实例到 Nacos
@override
Future<bool> registerInstance(String serviceName, String ip, int port,
{String groupName = 'DEFAULT_GROUP'}) async {
try {
final response = await dio.post(
'/nacos/v2/ns/instance',
queryParameters: {
'serviceName': serviceName,
'ip': ip,
'port': port,
'groupName': groupName,
'namespaceId': config.namespaceId,
'ephemeral': true
},
);
if (response.statusCode == 200) {
print('服务注册成功: $serviceName');
healthCheck = true;
//启动定时器 更新服务健康状态
Future.doWhile(() async {
if (healthCheck) {
await Future.delayed(Duration(seconds: 5), () async {
//发送心跳包
try {
final rr = await dio
.put("/nacos/v1/ns/instance/beat", queryParameters: {
'serviceName': serviceName,
'ip': ip,
'port': port,
'groupName': groupName,
'namespaceId': config.namespaceId,
});
print(rr);
} catch (e) {
print(e);
}
});
}
return healthCheck;
}).then((v) {
print("心跳线程结束");
});
// Timer.periodic(Duration(seconds: 5), (timer) async {
// //判断是否取消注册
// if (!healthCheck) {
// timer.cancel();
// return;
// }
// });
return true;
} else {
print('服务注册失败: ${response.statusCode} - ${response.data}');
return false;
}
} catch (e) {
print('请求失败: $e');
return false;
}
}
// 注销实例
@override
Future<bool> deRegisterInstance(String serviceName, String ip, int port,
{String groupName = 'DEFAULT_GROUP'}) async {
try {
final response = await dio.delete(
'/nacos/v2/ns/instance',
queryParameters: {
'serviceName': serviceName,
'ip': ip,
'port': port,
'groupName': groupName,
'namespaceId': config.namespaceId,
},
);
if (response.statusCode == 200) {
print('服务注销成功: $serviceName');
//标记心跳线程为退出状态
healthCheck = false;
return true;
} else {
print('服务注销失败: ${response.statusCode} - ${response.data}');
return false;
}
} catch (e) {
print('请求失败: $e');
return false;
}
}
// 获取服务实例列表
@override
Future<List<Map<String, dynamic>>> getInstanceList(String serviceName,
{String groupName = 'DEFAULT_GROUP'}) async {
try {
final response = await dio.get(
'/nacos/v2/ns/instance/list',
queryParameters: {
'serviceName': serviceName,
'groupName': groupName,
'namespaceId': config.namespaceId,
'healthyOnly': true,
},
);
if (response.statusCode == 200) {
final instances = response.data["data"]['hosts'];
// print('服务实例列表: $instances');
return List<Map<String, dynamic>>.from(instances);
} else {
print('获取服务实例列表失败: ${response.statusCode} - ${response.data}');
return [];
}
} catch (e) {
print('请求失败: $e');
return [];
}
}
// 获取配置
@override
Future<String> getConfig(String dataId,
{String group = 'DEFAULT_GROUP'}) async {
try {
final response = await dio.get(
'/nacos/v2/cs/config',
queryParameters: {
'dataId': dataId,
'group': group,
'namespaceId': config.namespaceId,
},
);
if (response.statusCode == 200) {
// print('获取配置成功: ${response.data}');
return response.data["data"];
} else {
print('获取配置失败: ${response.statusCode} - ${response.data}');
return '';
}
} catch (e) {
print('请求失败: $e');
return '';
}
}
// 发布配置
@override
Future<bool> publishConfig(
String dataId, String group, String content) async {
try {
final response = await dio.post(
'/nacos/v2/cs/config',
queryParameters: {
'dataId': dataId,
'group': group,
'namespaceId': config.namespaceId,
'content': content,
},
);
if (response.statusCode == 200) {
print('配置发布成功: $dataId');
return true;
} else {
print('配置发布失败: ${response.statusCode} - ${response.data}');
return false;
}
} catch (e) {
print('请求失败: $e');
return false;
}
}
// 删除配置
@override
Future<bool> deleteConfig(String dataId, String group) async {
try {
final response = await dio.delete(
'/nacos/v2/cs/config',
queryParameters: {
'dataId': dataId,
'group': group,
'namespaceId': config.namespaceId,
},
);
if (response.statusCode == 200) {
print('配置删除成功: $dataId');
return true;
} else {
print('配置删除失败: ${response.statusCode} - ${response.data}');
return false;
}
} catch (e) {
print('请求失败: $e');
return false;
}
}
}

119
lib/base/http/TraceDio.dart Normal file
View File

@@ -0,0 +1,119 @@
import 'package:EasyDartModule/base/logger/Logger.dart';
import 'package:dio/dio.dart';
import 'package:shelf/shelf.dart' as sf;
class TraceDio {
final Dio _dio;
final Logger _logger;
static late TraceDio _traceDio;
TraceDio(Logger logger)
: _dio = Dio(),
_logger = logger {
// 配置 Dio
// 设置连接超时
_dio.options.connectTimeout = Duration(seconds: 5);
// 设置接收超时
_dio.options.receiveTimeout = Duration(seconds: 5);
//保留原始大小写
_dio.options.preserveHeaderCase = true;
// 设置拦截器,自动添加 traceId 和 spanId并记录日志
String traceId = "none";
String spanId = "none";
_dio.interceptors.add(InterceptorsWrapper(
onRequest: (options, handler) {
// 获取请求中的 traceId如果没有则生成
traceId = options.headers['X-Trace-ID'];
spanId = options.headers['X-Span-ID'];
// 在请求头中添加 traceId 和 spanId
// options.headers['X-Trace-ID'] = traceId;
// options.headers['X-Span-ID'] = spanId;
// 记录请求日志
_logger.info(
'traceId=$traceId, spanId=$spanId Sending request: ${options.method} ${options.uri}',
tag: "DIO");
return handler.next(options); // 继续请求
},
onResponse: (response, handler) {
// 记录响应日志
_logger.info(
'traceId=$traceId, spanId=$spanId Response received: ${response.statusCode} ${response.statusMessage}',
tag: "DIO");
return handler.next(response); // 继续处理响应
},
onError: (DioException e, handler) {
// 记录错误日志
_logger.error(
'traceId=$traceId, spanId=$spanId Request failed: ${e.message}',
tag: "DIO");
return handler.next(e); // 继续处理错误
},
));
}
static TraceDio getInstance() {
return _traceDio;
}
static void setInstance(TraceDio traceDio) {
_traceDio = traceDio;
}
Map<String, dynamic>? getHeader(
{Map<String, dynamic>? headers, sf.Request? request}) {
if (request != null) {
return {
"X-Trace-ID": request.context['request_trace_id'] as String,
"X-Span-ID": request.context['request_span_id'] as String
};
}
return null;
}
// 发起 GET 请求
Future<Response> get(String url,
{Map<String, dynamic>? queryParameters, sf.Request? request}) async {
return await _dio.get(url,
queryParameters: queryParameters,
options: Options(headers: getHeader(request: request)));
}
// 发起 POST 请求
Future<Response> post(String url,
{Object? data,
Map<String, dynamic>? queryParameters,
sf.Request? request}) async {
return await _dio.post(url,
data: data,
queryParameters: queryParameters,
options: Options(headers: getHeader(request: request)));
}
// 发起 PUT 请求
Future<Response> put(String url,
{Object? data,
Map<String, dynamic>? queryParameters,
sf.Request? request}) async {
return await _dio.put(url,
data: data,
queryParameters: queryParameters,
options: Options(headers: getHeader(request: request)));
}
// 发起 DELETE 请求
Future<Response> delete(String url,
{Object? data,
Map<String, dynamic>? queryParameters,
sf.Request? request}) async {
return await _dio.delete(
url,
data: data,
queryParameters: queryParameters,
options: Options(headers: getHeader(request: request)),
);
}
}

View File

@@ -0,0 +1,21 @@
abstract class Logger {
static late Logger _logger;
static Logger getInstance() {
return _logger;
}
static void setInstance(Logger logger) {
_logger = logger;
}
void info(String msg, {String tag});
void warning(String msg, {String tag});
void error(String msg, {String tag});
}
class LoggerConfig {
String host;
String serviceName;
LoggerConfig({required this.host, required this.serviceName});
}

View File

@@ -0,0 +1,72 @@
import 'dart:convert';
import 'dart:io';
import 'package:EasyDartModule/base/logger/Logger.dart';
import 'package:dio/dio.dart';
enum LoggerLevel {
debug(1),
info(2),
warning(3),
error(4),
off(5),
;
final int level;
const LoggerLevel(this.level);
}
class LokiLogger implements Logger {
final LoggerConfig? _config;
final Dio dio;
LoggerLevel level = LoggerLevel.info;
LokiLogger(this._config)
: dio = Dio(BaseOptions(
baseUrl: _config == null ? "" : _config.host,
headers: {
"Content-Type": "application/json",
"Content-Encoding": "gzip"
}));
@override
void info(String msg, {String? tag}) {
log(msg, level: LoggerLevel.info, tag: tag);
}
@override
void warning(String msg, {String? tag}) {
log(msg, level: LoggerLevel.warning, tag: tag);
}
@override
void error(String msg, {String? tag}) {
log(msg, level: LoggerLevel.error, tag: tag);
}
void log(String msg, {required LoggerLevel level, String? tag}) {
if (level.level < this.level.level) {
//日志等级小于设置的输出日志等级
return;
}
if (_config == null) {
print("$tag $level $msg");
} else {
//推送到loki服务器
//{_config.url}
var now = DateTime.now();
// 转换为纳秒
int nanoseconds = now.microsecondsSinceEpoch * 1000;
var zip = gzip.encode(utf8.encode(jsonEncode({
"streams": [
{
"stream": {"service_name": _config.serviceName},
"values": [
[nanoseconds.toString(), "$tag ${level.name.toUpperCase()} $msg"]
]
}
]
})));
dio.post("/loki/api/v1/push", data: zip);
}
}
}

82
lib/base/mqtt/mqtt.dart Normal file
View File

@@ -0,0 +1,82 @@
import 'package:mqtt5_client/mqtt5_client.dart';
import 'package:mqtt5_client/mqtt5_server_client.dart';
class Mqtt {
final MqttConfig _config;
MqttClient? _client;
Mqtt(this._config);
static late Mqtt _mqtt;
static Mqtt getInstance() {
return _mqtt;
}
static void setInstance(Mqtt server) {
_mqtt = server;
}
Future<bool> connect() async {
if (_client != null) {
return true;
}
_client =
MqttServerClient.withPort(_config.host, _config.clientId, _config.port);
_client?.autoReconnect = true;
await _client?.connect(_config.username, _config.password);
_client?.updates.listen((List<MqttReceivedMessage<MqttMessage>> message) {
final recMess = message[0].payload as MqttPublishMessage;
final payload =
MqttUtilities.bytesToStringAsString(recMess.payload.message!);
_config.messgae(message[0].topic!, payload);
});
_config.topic?.forEach((topic) {
subscribe(topic, _config.qos);
});
return true;
}
void disconnect() {
_client?.disconnect();
_client = null;
}
void subscribe(String topic, int qos) {
_client?.subscribe(topic, MqttUtilities.getQosLevel(qos));
}
void unSubscribe(String topic) {
_client?.unsubscribeStringTopic(topic);
}
void publish(String topic, String msg, {int qos = 0}) {
var payload = MqttPayloadBuilder();
payload.addString(msg);
_client?.publishMessage(
topic, MqttUtilities.getQosLevel(qos), payload.payload!);
}
}
class MqttConfig {
final String host;
final int port;
final String clientId;
String? username;
String? password;
List<String>? topic;
int qos;
Function(String topic, String message) messgae;
MqttConfig(
{required this.host,
this.port = 1883,
required this.clientId,
required this.messgae,
this.topic,
this.qos = 0,
this.username,
this.password});
}

View File

@@ -0,0 +1,36 @@
import 'dart:typed_data';
abstract class Storage {
static late Storage _storage;
static Storage getInstance() {
return _storage;
}
static void setInstance(Storage server) {
_storage = server;
}
Future<void> createBucket(String name);
Future<void> removeBucket(String name);
Future<String> uploadObject(
String bucketName, String objectName, Uint8List data);
Future<Uint8List> getObject(String bucketName, String objectName);
Future<void> deleteObject(String bucketName, String objectName);
}
class StorageConfig {
final String host;
final int port;
final bool ssl;
final String accessKey;
final String secretKey;
StorageConfig(
{required this.host,
required this.port,
this.ssl = false,
required this.accessKey,
required this.secretKey});
}

View File

@@ -0,0 +1,54 @@
import 'dart:async';
import 'dart:typed_data';
import 'package:EasyDartModule/base/storage/Storage.dart';
import 'package:minio/minio.dart';
class MinioStorage implements Storage {
final Minio _minio;
final StorageConfig _config;
MinioStorage(this._config)
: _minio = Minio(
endPoint: _config.host,
port: _config.port,
useSSL: _config.ssl,
accessKey: _config.accessKey,
secretKey: _config.secretKey);
@override
Future<void> createBucket(String name) async {
if (await _minio.bucketExists(name)) {
return;
}
return await _minio.makeBucket(name);
}
@override
Future<void> removeBucket(String name) async {
return await _minio.removeBucket(name);
}
@override
Future<void> deleteObject(String bucketName, String objectName) async {
return await _minio.removeObject(bucketName, objectName);
}
@override
Future<Uint8List> getObject(String bucketName, String objectName) async {
MinioByteStream stream = await _minio.getObject(bucketName, objectName);
// 将 Stream<List<int>> 转为 List<int>
final List<int> bytes = await stream
.toList()
.then((chunks) => chunks.expand((chunk) => chunk).toList());
// 转换为 Uint8List
return Uint8List.fromList(bytes);
}
@override
Future<String> uploadObject(
String bucketName, String objectName, Uint8List data) async {
await _minio.putObject(bucketName, objectName, Stream.fromIterable([data]));
return "http${_config.ssl ? "s" : ""}://${_config.host}:${_config.port}/$bucketName/$objectName";
}
}

View File

@@ -0,0 +1,32 @@
abstract class WebServer {
static late WebServer _webServer;
static WebServer getInstance() {
return _webServer;
}
static void setInstance(WebServer server) {
_webServer = server;
}
void start(int port);
void stop();
void addHandler(handler);
}
enum HttpMethod {
GET,
POST,
PUT,
DELETE,
ALL,
WS,
;
}
class RequestMapping {
final HttpMethod method;
final String path;
const RequestMapping({this.method = HttpMethod.ALL, required this.path});
}

View File

@@ -0,0 +1,212 @@
import 'dart:io';
import 'dart:mirrors';
import 'package:EasyDartModule/EasyDartModule.dart';
import 'package:EasyDartModule/base/logger/Logger.dart';
import 'package:EasyDartModule/base/webserver/WebServer.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf/shelf_io.dart';
import 'package:shelf_router/shelf_router.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:uuid/uuid.dart';
class ShelfWebServer implements WebServer {
late Logger logger;
HttpServer? _server;
final Router _router = Router();
final Uuid uuid = Uuid();
final Map<String, WebSocketHandler> _wsCall = {};
ShelfWebServer(this.logger);
final String tag = "webserver";
// 中间件:记录请求日志
Middleware logRequests() {
return (Handler innerHandler) {
return (Request request) async {
final requestId = request.context['request_trace_id'] as String;
final requestSpanId = request.context['request_span_id'] as String;
final requestParentSpanId =
request.context['request_parent_span_id'] as String?;
final String logPer =
"traceId=$requestId spanId=$requestSpanId parentSpanId=$requestParentSpanId";
logger.info('$logPer | 请求路径: ${request.requestedUri}', tag: tag);
// final stopwatch = Stopwatch()..start();
final stopwatch = request.context["request_stop_watch"] as Stopwatch;
stopwatch.start();
Response response;
try {
response = await innerHandler(request);
stopwatch.stop();
logger.info(
'$logPer | 响应状态码: ${response.statusCode} | 响应时间: ${stopwatch.elapsedMilliseconds}ms',
tag: tag);
} catch (e, s) {
if (e is HijackException) {
//不能处理该异常直接抛出
throw e;
}
stopwatch.stop();
logger.error("$logPer | 服务器错误 | ${e.toString()} ${s.toString()}",
tag: tag);
response = Response(500,
body: "Internal Server Error\r\nTraceId:$requestId");
}
return response;
};
};
}
// 中间件为每个请求生成唯一的请求ID
Middleware requestIdMiddleware() {
return (Handler innerHandler) {
return (Request request) async {
// 获取请求头中的 X-Request-ID如果没有则生成一个
final requestId = request.headers['X-Trace-ID'] ?? uuid.v4();
String? spanId = request.headers['X-Span-ID'];
String? parentSpanId;
if (spanId != null) {
parentSpanId = spanId;
}
spanId = uuid.v4();
final updatedRequest = request.change(context: {
'request_trace_id': requestId,
"request_span_id": spanId,
"request_parent_span_id": parentSpanId,
"request_stop_watch": Stopwatch()
});
return innerHandler(updatedRequest);
};
};
}
Future<Response> _routerHandler(Request request) async {
String path = request.url.path;
if (_wsCall.containsKey(path)) {
final String spanId = request.context['request_span_id'] as String;
final String requestId = request.context['request_trace_id'] as String;
final String? requestParentSpanId =
request.context['request_parent_span_id'] as String?;
return await webSocketHandler((channel) {
_wsCall[path]!.chanelMap[spanId] = channel;
_wsCall[path]!.open(spanId);
// 监听 WebSocket 消息
channel.stream.listen((message) {
print('Received message: $message');
// 处理消息并回应客户端
// channel.sink.add(message);
_wsCall[path]!.message(spanId, message);
}, onDone: () {
try {
_wsCall[path]!.close(spanId);
_wsCall[path]!.chanelMap.remove(spanId);
} catch (e) {
print(e);
}
final stopwatch = request.context["request_stop_watch"] as Stopwatch;
stopwatch.stop();
logger.info(
'traceId=$requestId spanId=$spanId parentSpanId=$requestParentSpanId | WebSocket closed | 连接时长: ${stopwatch.elapsedMilliseconds}ms',
tag: "webserver");
}, onError: (error) {
logger.info(
'traceId=$requestId spanId=$spanId parentSpanId=$requestParentSpanId | WebSocket error: $error',
tag: "webserver");
});
})(request);
} else {
return await _router.call(request);
}
}
@override
void start(int port) async {
//反射获取全部路由地址
final handler = Pipeline()
.addMiddleware(requestIdMiddleware())
.addMiddleware(logRequests())
.addHandler(_routerHandler);
_server = await serve(handler, InternetAddress.anyIPv4, port);
print('Server listening on port ${_server?.port}');
}
@override
void stop() {
_server?.close();
}
@override
void addHandler(handler) {
ClassMirror cm = reflectClass(handler.runtimeType);
var im = reflect(handler);
String path = "";
for (var metadata in cm.metadata) {
// 检查元数据是否为RequestMapping类型
if (metadata.reflectee is RequestMapping) {
// 获取实例
RequestMapping annotation = metadata.reflectee;
path = annotation.path;
if (path.endsWith("/")) {
path = path.substring(0, path.length - 1);
}
if (annotation.method == HttpMethod.WS) {
//wwebsocket处理句柄直接加入路由
addRouter(annotation.method, path, handler);
return;
}
break;
}
}
im.type.declarations.forEach((k, v) {
if (v is MethodMirror) {
for (var m in v.metadata) {
if (m.reflectee is RequestMapping) {
var mp = m.reflectee.path as String;
if (mp.startsWith("/")) {
mp = mp.substring(1, mp.length);
}
String p = "$path/$mp";
// print("method: ${m.reflectee.method} $p");
//把地址加入路由
addRouter(m.reflectee.method, p, (req, [a, b, c, d, e, f]) {
return im.invoke(v.simpleName, [req]).reflectee;
});
}
}
}
});
}
void addRouter(HttpMethod method, String path, dynamic handler) {
if (method == HttpMethod.WS) {
//添加websocket连接处理函数
if (handler is WebSocketHandler) {
_wsCall[path] = handler;
} else {
print("回调函数类型错误 需要 WebSocketHandler ");
}
} else {
_router.add(method.name, path, handler);
}
}
}
abstract class WebSocketHandler {
Map<String, dynamic> chanelMap = {};
void open(String id);
void close(String id);
void message(String id, dynamic message);
void sendData(String id, dynamic data) {
if (chanelMap.containsKey(id)) {
chanelMap[id].sink.add(data);
} else {
print("id未找到");
}
}
}

24
pubspec.yaml Normal file
View File

@@ -0,0 +1,24 @@
name: EasyDartModule
description: A starting point for Dart libraries or applications.
version: 1.0.0
# repository: https://github.com/my_org/my_repo
environment:
sdk: ^3.5.4
# Add regular dependencies here.
dependencies:
# path: ^1.8.0
grpc: ^4.0.1
dio: ^5.0.0
mongo_dart: ^0.10.3
shelf: ^1.4.2
shelf_router: ^1.1.4
shelf_web_socket: ^2.0.1
uuid: ^4.5.1
mqtt5_client: ^4.6.2
minio: ^3.5.7
dev_dependencies:
lints: ^4.0.0
test: ^1.24.0