From d4a411ce68b2b40e71e8d75961f19230b4c8bfde Mon Sep 17 00:00:00 2001 From: Burgess Leo <1799594843@qq.com> Date: Fri, 23 May 2025 13:54:31 +0800 Subject: [PATCH] almost finish --- fake_main.py | 29 +- .../folders_save.py | 0 files_utils/folders_sort.py | 263 ++++++++++++++++++ test/files_sort.py | 156 +++++------ test/folders_save.py | 154 ---------- test/folders_sort.py | 199 +++++++++++++ 6 files changed, 559 insertions(+), 242 deletions(-) rename db_manage/__init__.py => files_utils/folders_save.py (100%) create mode 100644 files_utils/folders_sort.py delete mode 100644 test/folders_save.py create mode 100644 test/folders_sort.py diff --git a/fake_main.py b/fake_main.py index 90229a8..7f46b32 100644 --- a/fake_main.py +++ b/fake_main.py @@ -1,19 +1,28 @@ +import itertools + from files_utils.files_save import CopySingleFragmentFiles, CopyMultiFragmentFiles from files_utils.files_sort import GetSortFragments +from files_utils.folders_sort import ClassifyFilesAndFolders, ScanMultiFolders fragment_lists = {} -test_files = [ - r"CloudMusic/AGA - MIZU.mp3", - r"CloudMusic/AGA - 一.mp3", - r"CloudMusic/Aaron Zigman - Main Title.mp3", - r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3", - r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", - r"CloudMusic/Ava Max - Sweet but Psycho.mp3", - r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", - r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3" +mix_test_data = [ + "CloudMusic\\AGA - MIZU.mp3", + "CloudMusic/AGA - 一.mp3", + "CloudMusic/Aaron Zigman - Main Title.mp3", + "CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3", + "CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", + "CloudMusic/Ava Max - Sweet but Psycho.mp3", + "CloudMusic\\", + "folder1/", + "CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", + "CloudMusic/Color Music Choir - Something Just Like This (Live).mp3" ] +classify_files_and_folders = ClassifyFilesAndFolders(mix_test_data) +files_list = classify_files_and_folders["files"] +folders_files_list = ScanMultiFolders(classify_files_and_folders["folders"]) +merged_list = list(itertools.chain(files_list, folders_files_list)) -sort_fragments = GetSortFragments(db_path="./src/db_ntfs_info.db", files_list=test_files) +sort_fragments = GetSortFragments(db_path="./src/db_ntfs_info.db", files_list=merged_list) for item in sort_fragments: extent_count = item['extent_count'] if extent_count == 1: diff --git a/db_manage/__init__.py b/files_utils/folders_save.py similarity index 100% rename from db_manage/__init__.py rename to files_utils/folders_save.py diff --git a/files_utils/folders_sort.py b/files_utils/folders_sort.py new file mode 100644 index 0000000..462ca5d --- /dev/null +++ b/files_utils/folders_sort.py @@ -0,0 +1,263 @@ +import os +import sqlite3 + +from files_utils.files_sort import GetFilesDBNodeInfo, SortFragmentsByStartByte +from ntfs_utils.main import volume_letter + + +def GetFolderID( + folder_path: str, + db_path: str = "../src/db_ntfs_info.db", + table_name: str = "db_path" +) -> int | None: + """ + 根据文件夹路径,查询数据库中该文件夹对应的 ID。 + + :param folder_path: 文件夹路径(如 r"CloudMusic\\") + :param db_path: 数据库文件路径 + :param table_name: 要查询的数据表名称,默认为 'db_path' + :return: 成功则返回 ID(int),失败返回 None + """ + + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + try: + # 使用 table_name 构建 SQL 查询 + sql = f"SELECT ID FROM {table_name} WHERE Path = ?" + cursor.execute(sql, (folder_path,)) + result = cursor.fetchone() + + if result: + return result[0] + else: + print(f"未找到路径:{folder_path} 在表 {table_name} 中") + return None + + except sqlite3.Error as e: + print(f"数据库操作失败:{e}") + return None + + finally: + conn.close() + + +def GetSubPathsByParentID( + parent_id: int, + db_path: str = "../src/db_ntfs_info.db", + table_name: str = "db_path" +) -> list: + """ + 根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。 + + :param parent_id: 父节点 ID + :param db_path: 数据库文件路径 + :param table_name: 数据表名称 + :return: 包含 ID、Path、Name 的字典列表 + """ + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + sql = f""" + SELECT ID, Path, Name + FROM {table_name} + WHERE ParentID = ? + """ + + try: + cursor.execute(sql, (parent_id,)) + rows = cursor.fetchall() + except Exception as e: + print(f"数据库查询失败:{e}") + return [] + + results = [] + for row in rows: + item = { + 'id': row[0], + 'absolute_path': row[1], + 'name': row[2] + } + results.append(item) + + conn.close() + return results + + +def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list: + """ + 根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。 + :param db_path: 要查询的数据库 + :param folder_path: 文件夹的绝对路径 + :return list: 文件夹下所有文件按片段顺序排列的列表 + """ + parent_id = GetFolderID(folder_path=folder_path, db_path=db_path) + path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path) + node_data = GetFilesDBNodeInfo(path_records=path_data) + result = SortFragmentsByStartByte(node_data) + + return result + + +# if __name__ == "__main__": +# folder_path_test = "pictures/" +# data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test) +# for item in data: +# print(item) + + +def ScanDirectory(root_dir, skip_system=True): + """ + 递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。 + + :param root_dir: 要扫描的根目录路径 + :param skip_system: 是否跳过系统目录(默认 True) + :return: 文件路径列表,格式为 relative/path/to/file.ext + """ + file_list = [] + + for root, dirs, files in os.walk(root_dir): + # 跳过系统目录 + if skip_system: + dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"] + + for file in files: + full_path = os.path.join(root, file) + + # 去掉盘符 + _, relative_path = os.path.splitdrive(full_path) + + # 替换 \ 为 / + relative_path = relative_path.lstrip("\\").replace("\\", "/") + + file_list.append(relative_path) + + return file_list + + +# if __name__ == "__main__": +# folder_path = r"Y:/folder1/" +# files_list = ScanDirectory(folder_path) +# +# print(f"共找到 {len(files_list)} 个文件:") +# for f in files_list: +# print(f) + + +def ScanMultiFolders(folder_paths_list: list, skip_system: bool = True) -> list: + """ + 扫描多个根目录,返回所有文件的相对路径列表(格式为 folder/file.ext)。 + + :param folder_paths_list: 要扫描的根目录列表(如 ["CloudMusic/", "folder1/"]) + :param skip_system: 是否跳过系统目录 + :return: 文件路径列表(统一格式为 folder/file.ext) + """ + all_files = [] + + for root_dir in folder_paths_list: + # 规范化输入路径,确保结尾有 '/'(如果是目录) + normalized_root_dir = root_dir.replace("\\", "/") + if not normalized_root_dir.endswith("/"): + normalized_root_dir += "/" # 确保结尾 / + + full_root_path = f"{volume_letter}:/{normalized_root_dir}" + full_root_path = os.path.normpath(full_root_path) + + if not os.path.exists(full_root_path): + print(f"⚠️ 路径不存在:{full_root_path}") + continue + + for root, dirs, files in os.walk(full_root_path): + if skip_system: + dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"] + + for file in files: + full_file_path = os.path.join(root, file) + + # 去掉盘符 + _, relative_path = os.path.splitdrive(full_file_path) + + # 去除开头和结尾的 '\' 或 '/' 并替换分隔符 + normalized_path = relative_path.strip("\\").strip("/").replace("\\", "/") + + all_files.append(normalized_path) + + return all_files + + +# if __name__ == "__main__": +# folders = [ +# "CloudMusic\\", +# "folder1/" +# ] +# +# files = ScanMultiFolders(folders) +# +# print(f"共找到 {len(files)} 个文件:") +# for f in files: +# print(f) + + +def ClassifyFilesAndFolders(paths: list) -> dict: + """ + 将传入的路径列表分类为文件和目录,并统一使用 '/' 分隔符。 + 确保目录路径以 '/' 结尾。 + + :param paths: 路径列表(元素可以是文件或目录) + :return: 包含 'files' 和 'directories' 的字典,路径格式统一为 '/' + """ + files = [] + directories = [] + + for path in paths: + # 统一用 '/' 分隔符,并保留原始结构(是否以 '/' 结尾) + normalized_path = path.replace("\\", "/") + + # 判断是否原本是目录(以 '/' 或 '\' 结尾) + is_potential_dir = normalized_path.endswith("/") + + # 拼接完整路径用于判断是否存在 + full_path = f"{volume_letter}:/{normalized_path.lstrip('/')}" + full_path = os.path.normpath(full_path) + + if os.path.isfile(full_path): + # 如果是文件,去掉结尾的 /(如果有的话) + if normalized_path.endswith("/"): + normalized_path = normalized_path.rstrip("/") + files.append(normalized_path) + elif os.path.isdir(full_path): + # 如果是目录,确保以 '/' 结尾 + if not normalized_path.endswith("/"): + normalized_path += "/" + directories.append(normalized_path) + else: + print(f"⚠️ 路径不存在或类型未知:{normalized_path}") + + return { + 'files': files, + 'folders': directories + } + +# if __name__ == "__main__": +# test_paths = [ +# "CloudMusic\\AGA - MIZU.mp3", +# "CloudMusic/AGA - 一.mp3", +# "CloudMusic/Aaron Zigman - Main Title.mp3", +# "CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3", +# "CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", +# "CloudMusic/Ava Max - Sweet but Psycho.mp3", +# "CloudMusic\\", +# "folder1/", +# "CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", +# "CloudMusic/Color Music Choir - Something Just Like This (Live).mp3" +# ] +# +# result = ClassifyFilesAndFolders(test_paths) +# +# print("✅ 文件列表:") +# for f in result['files']: +# print(f) +# +# print("\n📁 文件夹列表:") +# for d in result['directories']: +# print(d) diff --git a/test/files_sort.py b/test/files_sort.py index d1e6906..1232b39 100644 --- a/test/files_sort.py +++ b/test/files_sort.py @@ -43,14 +43,14 @@ def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db", # if __name__ == "__main__": # test_files = [ -# r"Y:\CloudMusic\AGA - MIZU.mp3", -# r"Y:\CloudMusic\AGA - 一.mp3", -# r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", -# r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", -# r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", -# r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", -# r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", -# r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" +# r"CloudMusic/AGA - MIZU.mp3", +# r"CloudMusic/AGA - 一.mp3", +# r"CloudMusic/Aaron Zigman - Main Title.mp3", +# r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3", +# r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", +# r"CloudMusic/Ava Max - Sweet but Psycho.mp3", +# r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", +# r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3" # ] # # result = GetFilesDBPathInfo(files_path=test_files) @@ -123,27 +123,27 @@ def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str return results -# if __name__ == "__main__": -# test_files = [ -# r"Y:\CloudMusic\AGA - MIZU.mp3", -# r"Y:\CloudMusic\AGA - 一.mp3", -# r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", -# r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", -# r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", -# r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", -# r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", -# r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" -# ] -# -# # 第一步:获取 db_path 表中的 ID 和 Name -# path_info = GetFilesDBPathInfo(files_path=test_files) -# -# # 第二步:根据 PathID 查询 db_node 表中的分片信息 -# file_extents_info = GetFilesDBNodeInfo(path_records=path_info) -# -# # 打印结果 -# for item in file_extents_info: -# print(item) +if __name__ == "__main__": + test_files = [ + r"CloudMusic/AGA - MIZU.mp3", + r"CloudMusic/AGA - 一.mp3", + r"CloudMusic/Aaron Zigman - Main Title.mp3", + r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3", + r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", + r"CloudMusic/Ava Max - Sweet but Psycho.mp3", + r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", + r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3" + ] + + # 第一步:获取 db_path 表中的 ID 和 Name + path_info = GetFilesDBPathInfo(files_path=test_files) + + # 第二步:根据 PathID 查询 db_node 表中的分片信息 + file_extents_info = GetFilesDBNodeInfo(path_records=path_info) + + # 打印结果 + for item in file_extents_info: + print(item) def sort_fragments_by_start_byte(file_extents_list: list) -> list: @@ -181,52 +181,52 @@ def sort_fragments_by_start_byte(file_extents_list: list) -> list: return all_fragments -if __name__ == "__main__": - test_files = [ - r"CloudMusic\AGA - MIZU.mp3", - r"CloudMusic\AGA - 一.mp3", - r"CloudMusic\Aaron Zigman - Main Title.mp3", - r"CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", - r"CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", - r"CloudMusic\Ava Max - Sweet but Psycho.mp3", - r"CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", - r"CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" - ] - test_files_sort = [ - {'absolute_path': 'CloudMusic\\AGA - MIZU.mp3', 'name': 'AGA - MIZU.mp3', 'path_id': 6, 'extent_count': 1, - 'fragments': [{'start_byte': 694849536, 'length': 8126464}]}, - {'absolute_path': 'CloudMusic\\AGA - 一.mp3', 'name': 'AGA - 一.mp3', 'path_id': 7, 'extent_count': 2, - 'fragments': [{'start_byte': 702976000, 'length': 10870784}, {'start_byte': 23162880, 'length': 69632}]}, - {'absolute_path': 'CloudMusic\\Aaron Zigman - Main Title.mp3', 'name': 'Aaron Zigman - Main Title.mp3', - 'path_id': 5, 'extent_count': 1, 'fragments': [{'start_byte': 687685632, 'length': 7163904}]}, - {'absolute_path': 'CloudMusic\\Anson Seabra - Keep Your Head Up Princess.mp3', - 'name': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'path_id': 8, 'extent_count': 1, - 'fragments': [{'start_byte': 713846784, 'length': 7970816}]}, - {'absolute_path': 'CloudMusic\\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', - 'name': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', 'path_id': 9, - 'extent_count': 1, 'fragments': [{'start_byte': 721817600, 'length': 9179136}]}, - {'absolute_path': 'CloudMusic\\Ava Max - Sweet but Psycho.mp3', 'name': 'Ava Max - Sweet but Psycho.mp3', - 'path_id': 10, 'extent_count': 1, 'fragments': [{'start_byte': 731000832, 'length': 7938048}]}, - {'absolute_path': 'CloudMusic\\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', - 'name': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'path_id': 11, 'extent_count': 1, - 'fragments': [{'start_byte': 738938880, 'length': 6791168}]}, - {'absolute_path': 'CloudMusic\\Color Music Choir - Something Just Like This (Live).mp3', - 'name': 'Color Music Choir - Something Just Like This (Live).mp3', 'path_id': 12, 'extent_count': 1, - 'fragments': [{'start_byte': 745730048, 'length': 6193152}]}] - - path_info = GetFilesDBPathInfo(files_path=test_files) - file_extents_data = GetFilesDBNodeInfo(path_records=path_info) - - # 根据文件片段先后排序 - single_fragment_result = sort_fragments_by_start_byte(file_extents_data) - - # 模拟多文件片段,根据文件片段先后排序 - multi_fragment_result = sort_fragments_by_start_byte(test_files_sort) - - print("单文件片段排序结果:") - for item in single_fragment_result: - print(item) - - print("\n多文件片段排序结果:") - for item in multi_fragment_result: - print(item) +# if __name__ == "__main__": +# test_files = [ +# r"CloudMusic/AGA - MIZU.mp3", +# r"CloudMusic/AGA - 一.mp3", +# r"CloudMusic/Aaron Zigman - Main Title.mp3", +# r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3", +# r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", +# r"CloudMusic/Ava Max - Sweet but Psycho.mp3", +# r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", +# r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3" +# ] +# test_files_sort = [ +# {'absolute_path': 'CloudMusic/AGA - MIZU.mp3', 'name': 'AGA - MIZU.mp3', 'path_id': 6, 'extent_count': 1, +# 'fragments': [{'start_byte': 694849536, 'length': 8126464}]}, +# {'absolute_path': 'CloudMusic/AGA - 一.mp3', 'name': 'AGA - 一.mp3', 'path_id': 7, 'extent_count': 2, +# 'fragments': [{'start_byte': 702976000, 'length': 10870784}, {'start_byte': 23162880, 'length': 69632}]}, +# {'absolute_path': 'CloudMusic/Aaron Zigman - Main Title.mp3', 'name': 'Aaron Zigman - Main Title.mp3', +# 'path_id': 5, 'extent_count': 1, 'fragments': [{'start_byte': 687685632, 'length': 7163904}]}, +# {'absolute_path': 'CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3', +# 'name': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'path_id': 8, 'extent_count': 1, +# 'fragments': [{'start_byte': 713846784, 'length': 7970816}]}, +# {'absolute_path': 'CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', +# 'name': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', 'path_id': 9, +# 'extent_count': 1, 'fragments': [{'start_byte': 721817600, 'length': 9179136}]}, +# {'absolute_path': 'CloudMusic/Ava Max - Sweet but Psycho.mp3', 'name': 'Ava Max - Sweet but Psycho.mp3', +# 'path_id': 10, 'extent_count': 1, 'fragments': [{'start_byte': 731000832, 'length': 7938048}]}, +# {'absolute_path': 'CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', +# 'name': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'path_id': 11, 'extent_count': 1, +# 'fragments': [{'start_byte': 738938880, 'length': 6791168}]}, +# {'absolute_path': 'CloudMusic/Color Music Choir - Something Just Like This (Live).mp3', +# 'name': 'Color Music Choir - Something Just Like This (Live).mp3', 'path_id': 12, 'extent_count': 1, +# 'fragments': [{'start_byte': 745730048, 'length': 6193152}]}] +# +# path_info = GetFilesDBPathInfo(files_path=test_files) +# file_extents_data = GetFilesDBNodeInfo(path_records=path_info) +# +# # 根据文件片段先后排序 +# single_fragment_result = sort_fragments_by_start_byte(file_extents_data) +# +# # 模拟多文件片段,根据文件片段先后排序 +# multi_fragment_result = sort_fragments_by_start_byte(test_files_sort) +# +# print("单文件片段排序结果:") +# for item in single_fragment_result: +# print(item) +# +# print("\n多文件片段排序结果:") +# for item in multi_fragment_result: +# print(item) diff --git a/test/folders_save.py b/test/folders_save.py deleted file mode 100644 index 6f572cd..0000000 --- a/test/folders_save.py +++ /dev/null @@ -1,154 +0,0 @@ -import sqlite3 - -from files_sort import GetFilesDBNodeInfo - - -def GetFolderID( - folder_path: str, - db_path: str = "../src/db_ntfs_info.db", - table_name: str = "db_path" -) -> int | None: - """ - 根据文件夹路径,查询数据库中该文件夹对应的 ID。 - - :param folder_path: 文件夹路径(如 r"CloudMusic\\") - :param db_path: 数据库文件路径 - :param table_name: 要查询的数据表名称,默认为 'db_path' - :return: 成功则返回 ID(int),失败返回 None - """ - - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - - try: - # 使用 table_name 构建 SQL 查询 - sql = f"SELECT ID FROM {table_name} WHERE Path = ?" - cursor.execute(sql, (folder_path,)) - result = cursor.fetchone() - - if result: - return result[0] - else: - print(f"未找到路径:{folder_path} 在表 {table_name} 中") - return None - - except sqlite3.Error as e: - print(f"数据库操作失败:{e}") - return None - - finally: - conn.close() - - -def GetNodeFragmentsByParentID( - parent_id: int, - db_path: str = "../src/db_ntfs_info.db", - table_name: str = "db_node" -) -> list: - """ - 根据 ParentID 查询 db_node 表中对应的文件/子目录的分片信息。 - - :param parent_id: 父节点 ID - :param db_path: 数据库文件路径 - :param table_name: 数据表名称 - :return: 包含 PathID、分片数量及分片信息的列表 - """ - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - - # 构建查询语句(根据分片字段) - sql = f""" - SELECT - PathID, ExtentCount, - extent1_Location, extent1_Length, - extent2_Location, extent2_Length, - extent3_Location, extent3_Length, - extent4_Location, extent4_Length - FROM {table_name} - WHERE ParentID = ? - """ - - try: - cursor.execute(sql, (parent_id,)) - rows = cursor.fetchall() - except Exception as e: - print(f"数据库查询失败:{e}") - return [] - - results = [] - - for row in rows: - path_id = row[0] - extent_count = row[1] - - fragments = [] - for i in range(4): - location = row[2 + i * 2] - length = row[3 + i * 2] - - if location is not None and length is not None and length > 0: - fragments.append({ - 'start_byte': location, - 'length': length - }) - - results.append({ - 'path_id': path_id, - 'extent_count': extent_count, - 'fragments': fragments - }) - - conn.close() - return results - - -def GetSubPathsByParentID( - parent_id: int, - db_path: str = "../src/db_ntfs_info.db", - table_name: str = "db_path" -) -> list: - """ - 根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。 - - :param parent_id: 父节点 ID - :param db_path: 数据库文件路径 - :param table_name: 数据表名称 - :return: 包含 ID、Path、Name 的字典列表 - """ - conn = sqlite3.connect(db_path) - cursor = conn.cursor() - - sql = f""" - SELECT ID, Path, Name - FROM {table_name} - WHERE ParentID = ? - """ - - try: - cursor.execute(sql, (parent_id,)) - rows = cursor.fetchall() - except Exception as e: - print(f"数据库查询失败:{e}") - return [] - - results = [] - for row in rows: - item = { - 'id': row[0], - 'absolute_path': row[1], # 因为 Path 已经是完整格式(如 CloudMusic\\AGA - MIZU.mp3) - 'name': row[2] - } - results.append(item) - - conn.close() - return results - - -if __name__ == "__main__": - test_folder_path = "pictures/" - parent_id_test = GetFolderID(test_folder_path) - # node_data = GetNodeFragmentsByParentID(parent_id_test) - path_data = GetSubPathsByParentID(parent_id_test) - node_data = GetFilesDBNodeInfo(path_records=path_data) - for data in node_data: - print(data) diff --git a/test/folders_sort.py b/test/folders_sort.py new file mode 100644 index 0000000..9e086c4 --- /dev/null +++ b/test/folders_sort.py @@ -0,0 +1,199 @@ +import os +import sqlite3 + +from files_sort import GetFilesDBNodeInfo, sort_fragments_by_start_byte + + +def GetFolderID( + folder_path: str, + db_path: str = "../src/db_ntfs_info.db", + table_name: str = "db_path" +) -> int | None: + """ + 根据文件夹路径,查询数据库中该文件夹对应的 ID。 + + :param folder_path: 文件夹路径(如 r"CloudMusic\\") + :param db_path: 数据库文件路径 + :param table_name: 要查询的数据表名称,默认为 'db_path' + :return: 成功则返回 ID(int),失败返回 None + """ + + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + try: + # 使用 table_name 构建 SQL 查询 + sql = f"SELECT ID FROM {table_name} WHERE Path = ?" + cursor.execute(sql, (folder_path,)) + result = cursor.fetchone() + + if result: + return result[0] + else: + print(f"未找到路径:{folder_path} 在表 {table_name} 中") + return None + + except sqlite3.Error as e: + print(f"数据库操作失败:{e}") + return None + + finally: + conn.close() + + +def GetSubPathsByParentID( + parent_id: int, + db_path: str = "../src/db_ntfs_info.db", + table_name: str = "db_path" +) -> list: + """ + 根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。 + + :param parent_id: 父节点 ID + :param db_path: 数据库文件路径 + :param table_name: 数据表名称 + :return: 包含 ID、Path、Name 的字典列表 + """ + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + sql = f""" + SELECT ID, Path, Name + FROM {table_name} + WHERE ParentID = ? + """ + + try: + cursor.execute(sql, (parent_id,)) + rows = cursor.fetchall() + except Exception as e: + print(f"数据库查询失败:{e}") + return [] + + results = [] + for row in rows: + item = { + 'id': row[0], + 'absolute_path': row[1], + 'name': row[2] + } + results.append(item) + + conn.close() + return results + + +if __name__ == "__main__": + test_folder_path = "pictures/" + parent_id_test = GetFolderID(test_folder_path) + # node_data = GetNodeFragmentsByParentID(parent_id_test) + path_data = GetSubPathsByParentID(parent_id_test) + node_data = GetFilesDBNodeInfo(path_records=path_data) + for data in node_data: + print(data) + + +def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list: + """ + 根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。 + :param db_path: 要查询的数据库 + :param folder_path: 文件夹的绝对路径 + :return list: 文件夹下所有文件按片段顺序排列的列表 + """ + parent_id = GetFolderID(folder_path=folder_path, db_path=db_path) + path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path) + node_data = GetFilesDBNodeInfo(path_records=path_data) + result = sort_fragments_by_start_byte(node_data) + + return result + + +# if __name__ == "__main__": +# folder_path_test = "pictures/" +# data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test) +# for item in data: +# print(item) + + +def ScanDirectory(root_dir, skip_system=True): + """ + 递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。 + + :param root_dir: 要扫描的根目录路径 + :param skip_system: 是否跳过系统目录(默认 True) + :return: 文件路径列表,格式为 relative/path/to/file.ext + """ + file_list = [] + + for root, dirs, files in os.walk(root_dir): + # 跳过系统目录 + if skip_system: + dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"] + + for file in files: + full_path = os.path.join(root, file) + + # 去掉盘符 + _, relative_path = os.path.splitdrive(full_path) + + # 替换 \ 为 / + relative_path = relative_path.lstrip("\\").replace("\\", "/") + + file_list.append(relative_path) + + return file_list + + +# if __name__ == "__main__": +# folder_path = r"Y:/folder1/" +# files_list = ScanDirectory(folder_path) +# +# print(f"共找到 {len(files_list)} 个文件:") +# for f in files_list: +# print(f) + + +def ScanMultiFolders(folder_paths, skip_system=True): + """ + 扫描多个根目录,返回所有子目录中的文件路径列表。 + + :param folder_paths: 包含多个根目录的列表 + :param skip_system: 是否跳过系统目录(默认 True) + :return: 所有文件的相对路径列表(格式为 folder/file.ext) + """ + all_files = [] + + for root_dir in folder_paths: + # 确保路径存在 + if not os.path.exists(root_dir): + print(f"⚠️ 路径不存在:{root_dir}") + continue + + for root, dirs, files in os.walk(root_dir): + # 跳过系统目录 + if skip_system: + dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"] + + for file in files: + full_path = os.path.join(root, file) + + # 去掉盘符 + _, relative_path = os.path.splitdrive(full_path) + relative_path = relative_path.lstrip("\\").replace("\\", "/") + + all_files.append(relative_path) + + return all_files + + +if __name__ == "__main__": + folders = [ + r"Y:\CloudMusic", + r"Y:\folder1" + ] + + files = ScanMultiFolders(folders) + + print(f"共找到 {len(files)} 个文件:") + for f in files: + print(f)