import sqlite3 from files_sort import GetFilesDBNodeInfo def GetFolderID( folder_path: str, db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_path" ) -> int | None: """ 根据文件夹路径,查询数据库中该文件夹对应的 ID。 :param folder_path: 文件夹路径(如 r"CloudMusic\\") :param db_path: 数据库文件路径 :param table_name: 要查询的数据表名称,默认为 'db_path' :return: 成功则返回 ID(int),失败返回 None """ conn = sqlite3.connect(db_path) cursor = conn.cursor() try: # 使用 table_name 构建 SQL 查询 sql = f"SELECT ID FROM {table_name} WHERE Path = ?" cursor.execute(sql, (folder_path,)) result = cursor.fetchone() if result: return result[0] else: print(f"未找到路径:{folder_path} 在表 {table_name} 中") return None except sqlite3.Error as e: print(f"数据库操作失败:{e}") return None finally: conn.close() def GetNodeFragmentsByParentID( parent_id: int, db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node" ) -> list: """ 根据 ParentID 查询 db_node 表中对应的文件/子目录的分片信息。 :param parent_id: 父节点 ID :param db_path: 数据库文件路径 :param table_name: 数据表名称 :return: 包含 PathID、分片数量及分片信息的列表 """ conn = sqlite3.connect(db_path) cursor = conn.cursor() # 构建查询语句(根据分片字段) sql = f""" SELECT PathID, ExtentCount, extent1_Location, extent1_Length, extent2_Location, extent2_Length, extent3_Location, extent3_Length, extent4_Location, extent4_Length FROM {table_name} WHERE ParentID = ? """ try: cursor.execute(sql, (parent_id,)) rows = cursor.fetchall() except Exception as e: print(f"数据库查询失败:{e}") return [] results = [] for row in rows: path_id = row[0] extent_count = row[1] fragments = [] for i in range(4): location = row[2 + i * 2] length = row[3 + i * 2] if location is not None and length is not None and length > 0: fragments.append({ 'start_byte': location, 'length': length }) results.append({ 'path_id': path_id, 'extent_count': extent_count, 'fragments': fragments }) conn.close() return results def GetSubPathsByParentID( parent_id: int, db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_path" ) -> list: """ 根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。 :param parent_id: 父节点 ID :param db_path: 数据库文件路径 :param table_name: 数据表名称 :return: 包含 ID、Path、Name 的字典列表 """ conn = sqlite3.connect(db_path) cursor = conn.cursor() sql = f""" SELECT ID, Path, Name FROM {table_name} WHERE ParentID = ? """ try: cursor.execute(sql, (parent_id,)) rows = cursor.fetchall() except Exception as e: print(f"数据库查询失败:{e}") return [] results = [] for row in rows: item = { 'id': row[0], 'absolute_path': row[1], # 因为 Path 已经是完整格式(如 CloudMusic\\AGA - MIZU.mp3) 'name': row[2] } results.append(item) conn.close() return results if __name__ == "__main__": test_folder_path = "pictures/" parent_id_test = GetFolderID(test_folder_path) # node_data = GetNodeFragmentsByParentID(parent_id_test) path_data = GetSubPathsByParentID(parent_id_test) node_data = GetFilesDBNodeInfo(path_records=path_data) for data in node_data: print(data)