import os import sqlite3 from files_utils.files_sort import GetFilesDBNodeInfo, SortFragmentsByStartByte from ntfs_utils.main import volume_letter def GetFolderID( folder_path: str, db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_path" ) -> int | None: """ 根据文件夹路径,查询数据库中该文件夹对应的 ID。 :param folder_path: 文件夹路径(如 r"CloudMusic\\") :param db_path: 数据库文件路径 :param table_name: 要查询的数据表名称,默认为 'db_path' :return: 成功则返回 ID(int),失败返回 None """ conn = sqlite3.connect(db_path) cursor = conn.cursor() try: # 使用 table_name 构建 SQL 查询 sql = f"SELECT ID FROM {table_name} WHERE Path = ?" cursor.execute(sql, (folder_path,)) result = cursor.fetchone() if result: return result[0] else: print(f"未找到路径:{folder_path} 在表 {table_name} 中") return None except sqlite3.Error as e: print(f"数据库操作失败:{e}") return None finally: conn.close() def GetSubPathsByParentID( parent_id: int, db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_path" ) -> list: """ 根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。 :param parent_id: 父节点 ID :param db_path: 数据库文件路径 :param table_name: 数据表名称 :return: 包含 ID、Path、Name 的字典列表 """ conn = sqlite3.connect(db_path) cursor = conn.cursor() sql = f""" SELECT ID, Path, Name FROM {table_name} WHERE ParentID = ? """ try: cursor.execute(sql, (parent_id,)) rows = cursor.fetchall() except Exception as e: print(f"数据库查询失败:{e}") return [] results = [] for row in rows: item = { 'id': row[0], 'absolute_path': row[1], 'name': row[2] } results.append(item) conn.close() return results def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list: """ 根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。 :param db_path: 要查询的数据库 :param folder_path: 文件夹的绝对路径 :return list: 文件夹下所有文件按片段顺序排列的列表 """ parent_id = GetFolderID(folder_path=folder_path, db_path=db_path) path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path) node_data = GetFilesDBNodeInfo(path_records=path_data) result = SortFragmentsByStartByte(node_data) return result # if __name__ == "__main__": # folder_path_test = "pictures/" # data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test) # for item in data: # print(item) def ScanDirectory(root_dir, skip_system=True): """ 递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。 :param root_dir: 要扫描的根目录路径 :param skip_system: 是否跳过系统目录(默认 True) :return: 文件路径列表,格式为 relative/path/to/file.ext """ file_list = [] for root, dirs, files in os.walk(root_dir): # 跳过系统目录 if skip_system: dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"] for file in files: full_path = os.path.join(root, file) # 去掉盘符 _, relative_path = os.path.splitdrive(full_path) # 替换 \ 为 / relative_path = relative_path.lstrip("\\").replace("\\", "/") file_list.append(relative_path) return file_list # if __name__ == "__main__": # folder_path = r"Y:/folder1/" # files_list = ScanDirectory(folder_path) # # print(f"共找到 {len(files_list)} 个文件:") # for f in files_list: # print(f) def ScanMultiFolders(folder_paths_list: list, skip_system: bool = True) -> list: """ 扫描多个根目录,返回所有文件的相对路径列表(格式为 folder/file.ext)。 :param folder_paths_list: 要扫描的根目录列表(如 ["CloudMusic/", "folder1/"]) :param skip_system: 是否跳过系统目录 :return: 文件路径列表(统一格式为 folder/file.ext) """ all_files = [] for root_dir in folder_paths_list: # 规范化输入路径,确保结尾有 '/'(如果是目录) normalized_root_dir = root_dir.replace("\\", "/") if not normalized_root_dir.endswith("/"): normalized_root_dir += "/" # 确保结尾 / full_root_path = f"{volume_letter}:/{normalized_root_dir}" full_root_path = os.path.normpath(full_root_path) if not os.path.exists(full_root_path): print(f"⚠️ 路径不存在:{full_root_path}") continue for root, dirs, files in os.walk(full_root_path): if skip_system: dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"] for file in files: full_file_path = os.path.join(root, file) # 去掉盘符 _, relative_path = os.path.splitdrive(full_file_path) # 去除开头和结尾的 '\' 或 '/' 并替换分隔符 normalized_path = relative_path.strip("\\").strip("/").replace("\\", "/") all_files.append(normalized_path) return all_files # if __name__ == "__main__": # folders = [ # "CloudMusic\\", # "folder1/" # ] # # files = ScanMultiFolders(folders) # # print(f"共找到 {len(files)} 个文件:") # for f in files: # print(f) def ClassifyFilesAndFolders(paths: list) -> dict: """ 将传入的路径列表分类为文件和目录,并统一使用 '/' 分隔符。 确保目录路径以 '/' 结尾。 :param paths: 路径列表(元素可以是文件或目录) :return: 包含 'files' 和 'directories' 的字典,路径格式统一为 '/' """ files = [] directories = [] for path in paths: # 统一用 '/' 分隔符,并保留原始结构(是否以 '/' 结尾) normalized_path = path.replace("\\", "/") # 判断是否原本是目录(以 '/' 或 '\' 结尾) is_potential_dir = normalized_path.endswith("/") # 拼接完整路径用于判断是否存在 full_path = f"{volume_letter}:/{normalized_path.lstrip('/')}" full_path = os.path.normpath(full_path) if os.path.isfile(full_path): # 如果是文件,去掉结尾的 /(如果有的话) if normalized_path.endswith("/"): normalized_path = normalized_path.rstrip("/") files.append(normalized_path) elif os.path.isdir(full_path): # 如果是目录,确保以 '/' 结尾 if not normalized_path.endswith("/"): normalized_path += "/" directories.append(normalized_path) else: print(f"⚠️ 路径不存在或类型未知:{normalized_path}") return { 'files': files, 'folders': directories } # if __name__ == "__main__": # test_paths = [ # "CloudMusic\\AGA - MIZU.mp3", # "CloudMusic/AGA - 一.mp3", # "CloudMusic/Aaron Zigman - Main Title.mp3", # "CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3", # "CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", # "CloudMusic/Ava Max - Sweet but Psycho.mp3", # "CloudMusic\\", # "folder1/", # "CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", # "CloudMusic/Color Music Choir - Something Just Like This (Live).mp3" # ] # # result = ClassifyFilesAndFolders(test_paths) # # print("✅ 文件列表:") # for f in result['files']: # print(f) # # print("\n📁 文件夹列表:") # for d in result['directories']: # print(d)