From 6b7782522d82c5e08f474be47446a0888e646844 Mon Sep 17 00:00:00 2001 From: qmqz Date: Wed, 22 Apr 2026 18:06:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Ewebsocket=20=E4=B8=BB?= =?UTF-8?q?=E5=8A=A8=E6=96=AD=E5=BC=80=20=20=E5=8A=A0=E5=85=A5=20sqlite?= =?UTF-8?q?=E6=95=B0=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lib/EasyDartModule.dart | 13 +- lib/base/database/DataBase.dart | 16 +- lib/base/database/impl/SqliteDb.dart | 418 ++++++++++++++++++++ lib/base/webserver/impl/ShelfWebServer.dart | 6 +- pubspec.yaml | 1 + 5 files changed, 445 insertions(+), 9 deletions(-) create mode 100644 lib/base/database/impl/SqliteDb.dart diff --git a/lib/EasyDartModule.dart b/lib/EasyDartModule.dart index e676b5f..a699cb1 100644 --- a/lib/EasyDartModule.dart +++ b/lib/EasyDartModule.dart @@ -4,6 +4,7 @@ import 'dart:async'; import 'package:EasyDartModule/base/database/DataBase.dart'; import 'package:EasyDartModule/base/database/impl/MongoDb.dart'; +import 'package:EasyDartModule/base/database/impl/SqliteDb.dart'; import 'package:EasyDartModule/base/discovery/Discovery.dart'; import 'package:EasyDartModule/base/discovery/impl/NacosDiscovery.dart'; import 'package:EasyDartModule/base/http/TraceDio.dart'; @@ -22,7 +23,7 @@ import 'package:event_bus/event_bus.dart'; export 'package:shelf/shelf.dart'; export 'package:mongo_dart/mongo_dart.dart'; -// export 'package:EasyDartModule/base/language/extensions/StringExt.dart'; +export 'package:EasyDartModule/base/database/impl/SqliteDb.dart'; class EasyDartModule { static Discovery get discovery => Discovery.getInstance(); @@ -52,8 +53,14 @@ class EasyDartModule { Discovery.setInstance(NacosDiscovery(discoveryConfig)); } if (dataBaseConfig != null) { - //mongo数据库 - DataBase.setInstance(MongoDb(dataBaseConfig)); + switch (dataBaseConfig.type) { + case DataBaseType.mongo: + DataBase.setInstance(MongoDb(dataBaseConfig)); + break; + case DataBaseType.sqlite: + DataBase.setInstance(SqliteDb.fromConfig(dataBaseConfig)); + break; + } } if (mqttConfig != null) { //mqtt diff --git a/lib/base/database/DataBase.dart b/lib/base/database/DataBase.dart index ba0285b..53a8eec 100644 --- a/lib/base/database/DataBase.dart +++ b/lib/base/database/DataBase.dart @@ -29,14 +29,20 @@ abstract class DataBase { Future count(String tbale, {dynamic condition}); } +enum DataBaseType { mongo, sqlite } + class DataBaseConfig { + DataBaseType type; String host; String userName; String password; String dataBase; - DataBaseConfig( - {required this.host, - required this.userName, - required this.password, - required this.dataBase}); + + DataBaseConfig({ + this.type = DataBaseType.mongo, + this.host = '', + this.userName = '', + this.password = '', + required this.dataBase, + }); } diff --git a/lib/base/database/impl/SqliteDb.dart b/lib/base/database/impl/SqliteDb.dart new file mode 100644 index 0000000..6a76dbf --- /dev/null +++ b/lib/base/database/impl/SqliteDb.dart @@ -0,0 +1,418 @@ +import 'dart:convert'; +import 'dart:typed_data'; + +import 'package:EasyDartModule/base/database/DataBase.dart'; +import 'package:mongo_dart/mongo_dart.dart' show ObjectId; +import 'package:sqlite3/sqlite3.dart'; + +class SqliteDb implements DataBase { + final String path; + late final Database db; + + SqliteDb(this.path) { + db = sqlite3.open(path); + } + + SqliteDb.memory() : path = ':memory:' { + db = sqlite3.openInMemory(); + } + + SqliteDb.fromConfig(DataBaseConfig config) : this(config.dataBase); + + @override + bool isConnected() { + try { + db.select('SELECT 1'); + return true; + } catch (_) { + return false; + } + } + + void close() { + db.dispose(); + } + + void execute(String sql, [List parameters = const []]) { + db.execute(sql, parameters); + } + + @override + Future count(String tbale, {dynamic condition}) async { + final where = _buildWhere(condition); + final ResultSet result; + try { + result = db.select( + 'SELECT COUNT(*) AS count FROM ${_quoteIdentifier(tbale)}${where.sql}', + where.args, + ); + } on SqliteException catch (e) { + if (_isNoSuchTableError(e)) { + return 0; + } + rethrow; + } + return result.first['count'] as int; + } + + @override + Future delete(String table, dynamic condition) async { + final where = _buildWhere(condition); + db.execute( + 'DELETE FROM ${_quoteIdentifier(table)}${where.sql}', + where.args, + ); + } + + @override + Future insert(String table, Map data) async { + if (data.isEmpty) { + throw ArgumentError.value(data, 'data', 'Insert data cannot be empty.'); + } + + data['_id'] ??= ObjectId().oid; + final sqliteData = _toSqliteData(data); + _ensureTableColumns(table, sqliteData); + final columns = sqliteData.keys.map(_quoteIdentifier).join(', '); + final placeholders = List.filled(sqliteData.length, '?').join(', '); + db.execute( + 'INSERT INTO ${_quoteIdentifier(table)} ($columns) VALUES ($placeholders)', + sqliteData.values.toList(), + ); + } + + @override + Future>> query(String table, + {dynamic condition}) async { + final clause = _buildQueryClause(condition); + final ResultSet result; + try { + result = db.select( + 'SELECT * FROM ${_quoteIdentifier(table)}${clause.sql}', + clause.args, + ); + } on SqliteException catch (e) { + if (_isNoSuchTableError(e)) { + return []; + } + rethrow; + } + return result.map((row) => Map.from(row)).toList(); + } + + @override + Future update( + String table, Map data, dynamic condition, + {bool multiUpdate = false}) async { + if (data.isEmpty) { + throw ArgumentError.value(data, 'data', 'Update data cannot be empty.'); + } + + final where = _buildWhere(condition); + if (!multiUpdate && where.sql.isEmpty) { + throw ArgumentError( + 'Updating without a condition requires multiUpdate to be true.'); + } + + final sqliteData = _toSqliteData(data); + _ensureTableColumns(table, { + ..._conditionColumnValues(condition), + ...sqliteData, + }); + final tableName = _quoteIdentifier(table); + final setSql = + sqliteData.keys.map((key) => '${_quoteIdentifier(key)} = ?').join(', '); + final updateWhere = multiUpdate + ? where + : _SqliteClause( + '${where.sql} AND rowid IN (SELECT rowid FROM $tableName${where.sql} LIMIT 1)', + [...where.args, ...where.args], + ); + db.execute( + 'UPDATE $tableName SET $setSql${updateWhere.sql}', + [...sqliteData.values, ...updateWhere.args], + ); + } + + _SqliteClause _buildQueryClause(dynamic condition) { + if (condition is SqliteCondition) { + final where = _buildWhere(condition); + final parts = [where.sql]; + if (condition.orderBy != null && condition.orderBy!.trim().isNotEmpty) { + parts.add(' ORDER BY ${condition.orderBy}'); + } + if (condition.limit != null) { + parts.add(' LIMIT ?'); + } + if (condition.offset != null) { + parts.add(' OFFSET ?'); + } + + return _SqliteClause( + parts.join(), + [ + ...where.args, + if (condition.limit != null) condition.limit, + if (condition.offset != null) condition.offset, + ], + ); + } + + return _buildWhere(condition); + } + + _SqliteClause _buildWhere(dynamic condition) { + if (condition == null) { + return const _SqliteClause('', []); + } + + if (condition is SqliteCondition) { + if (condition.where == null || condition.where!.trim().isEmpty) { + return const _SqliteClause('', []); + } + return _SqliteClause( + ' WHERE ${condition.where}', + condition.whereArgs.map(_toSqliteValue).toList(), + ); + } + + if (condition is String) { + final where = condition.trim(); + if (where.isEmpty) { + return const _SqliteClause('', []); + } + return _SqliteClause( + where.toUpperCase().startsWith('WHERE') ? ' $where' : ' WHERE $where', + const [], + ); + } + + if (condition is Map) { + if (condition.isEmpty) { + return const _SqliteClause('', []); + } + + final parts = []; + final args = []; + condition.forEach((key, value) { + final column = _quoteIdentifier(key); + if (value == null) { + parts.add('$column IS NULL'); + } else if (value is Iterable && + value is! String && + value is! Uint8List) { + final values = value.toList(); + if (values.isEmpty) { + parts.add('1 = 0'); + } else { + parts.add( + '$column IN (${List.filled(values.length, '?').join(', ')})'); + args.addAll(values.map(_toSqliteValue)); + } + } else { + parts.add('$column = ?'); + args.add(_toSqliteValue(value)); + } + }); + + return _SqliteClause(' WHERE ${parts.join(' AND ')}', args); + } + + throw ArgumentError.value( + condition, + 'condition', + 'Condition must be null, String, Map, or SqliteCondition.', + ); + } + + String _quoteIdentifier(String identifier) { + if (identifier.trim().isEmpty) { + throw ArgumentError.value( + identifier, 'identifier', 'Identifier is empty.'); + } + return identifier + .split('.') + .map((part) => '"${part.replaceAll('"', '""')}"') + .join('.'); + } + + void _ensureTableColumns(String table, Map columnValues) { + final columns = _normalizeColumnValues(columnValues); + if (columns.isEmpty) { + return; + } + + if (!_tableExists(table)) { + final definitions = columns.entries + .map((entry) => _columnDefinition(entry.key, entry.value)) + .join(', '); + db.execute('CREATE TABLE IF NOT EXISTS ${_quoteIdentifier(table)} ' + '($definitions)'); + _ensureIdUnique(table); + return; + } + + final existingColumns = _tableColumns(table); + for (final entry in columns.entries) { + if (existingColumns.contains(entry.key)) { + continue; + } + db.execute( + 'ALTER TABLE ${_quoteIdentifier(table)} ADD COLUMN ' + '${_columnDefinition(entry.key, entry.value, allowPrimaryKey: false)}', + ); + } + _ensureIdUnique(table); + } + + Map _normalizeColumnValues(Map values) { + final columns = {}; + values.forEach((key, value) { + if (key.trim().isEmpty) { + return; + } + columns[key] = _toSqliteValue(_representativeColumnValue(value)); + }); + return columns; + } + + dynamic _representativeColumnValue(dynamic value) { + if (value is Iterable && value is! String && value is! Uint8List) { + return value.cast().firstWhere( + (item) => item != null, + orElse: () => null, + ); + } + return value; + } + + Map _conditionColumnValues(dynamic condition) { + if (condition is! Map) { + return const {}; + } + return condition; + } + + bool _tableExists(String table) { + final result = db.select( + "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", + [table], + ); + return result.isNotEmpty; + } + + Set _tableColumns(String table) { + final result = db.select('PRAGMA table_info(${_quoteIdentifier(table)})'); + return result.map((row) => row['name'] as String).toSet(); + } + + String _columnDefinition(String name, dynamic value, + {bool allowPrimaryKey = true}) { + if (name == '_id') { + return '${_quoteIdentifier(name)} TEXT' + '${allowPrimaryKey ? ' PRIMARY KEY' : ''}'; + } + return '${_quoteIdentifier(name)} ${_sqliteType(value)}'; + } + + String _sqliteType(dynamic value) { + if (value is int || value is bool) { + return 'INTEGER'; + } + if (value is double || value is num) { + return 'REAL'; + } + if (value is Uint8List) { + return 'BLOB'; + } + return 'TEXT'; + } + + Map _toSqliteData(Map data) { + return data.map((key, value) => MapEntry(key, _toSqliteValue(value))); + } + + dynamic _toSqliteValue(dynamic value) { + if (value == null || + value is int || + value is double || + value is String || + value is Uint8List) { + return value; + } + if (value is bool) { + return value ? 1 : 0; + } + if (value is num) { + return value.toDouble(); + } + if (value is DateTime) { + return value.toIso8601String(); + } + if (value is ObjectId) { + return value.oid; + } + if (value is Map || value is Iterable) { + return _jsonEncodeValue(value); + } + return value.toString(); + } + + String _jsonEncodeValue(dynamic value) { + try { + return jsonEncode(value, toEncodable: _jsonEncodableValue); + } catch (_) { + return value.toString(); + } + } + + dynamic _jsonEncodableValue(dynamic value) { + final sqliteValue = _toSqliteValue(value); + if (sqliteValue is Uint8List) { + return base64Encode(sqliteValue); + } + return sqliteValue; + } + + void _ensureIdUnique(String table) { + if (!_tableColumns(table).contains('_id')) { + return; + } + db.execute( + 'CREATE UNIQUE INDEX IF NOT EXISTS ${_quoteIdentifier(_idIndexName(table))} ' + 'ON ${_quoteIdentifier(table)} (${_quoteIdentifier('_id')})', + ); + } + + String _idIndexName(String table) { + final safeTableName = table.replaceAll(RegExp(r'[^A-Za-z0-9_]'), '_'); + return 'idx_${safeTableName}_id_unique'; + } + + bool _isNoSuchTableError(SqliteException error) { + return error.message.toLowerCase().contains('no such table'); + } +} + +class SqliteCondition { + final String? where; + final List whereArgs; + final String? orderBy; + final int? limit; + final int? offset; + + const SqliteCondition({ + this.where, + this.whereArgs = const [], + this.orderBy, + this.limit, + this.offset, + }); +} + +class _SqliteClause { + final String sql; + final List args; + + const _SqliteClause(this.sql, this.args); +} diff --git a/lib/base/webserver/impl/ShelfWebServer.dart b/lib/base/webserver/impl/ShelfWebServer.dart index 36bf16b..4f8e80a 100644 --- a/lib/base/webserver/impl/ShelfWebServer.dart +++ b/lib/base/webserver/impl/ShelfWebServer.dart @@ -351,7 +351,11 @@ abstract class WebSocketHandler { void open(String id); void close(String id); void message(String id, dynamic message); - + void forceClose(String id, {String? msg}) { + if (chanelMap.containsKey(id)) { + chanelMap[id].sink.close(4000, msg); + } + } void sendData(String id, dynamic data) { if (chanelMap.containsKey(id)) { chanelMap[id].sink.add(data); diff --git a/pubspec.yaml b/pubspec.yaml index 1c0d7f6..355afe1 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -12,6 +12,7 @@ dependencies: grpc: ^4.0.1 dio: ^5.0.0 mongo_dart: ^0.10.3 + sqlite3: ^2.4.7 shelf: ^1.4.2 shelf_router: ^1.1.4 shelf_web_socket: ^2.0.1