import sqlite3 def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_path", files_path=None) -> list: """ 根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。 :param db_path: 数据库文件路径 :param table_name: 要查询的数据表名称 :param files_path: 文件的完整路径列表 :return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str} """ if files_path is None: file_paths = [] results = [] # 连接数据库 conn = sqlite3.connect(db_path) cursor = conn.cursor() for path in files_path: try: # 使用字符串格式化插入表名,参数化查询只适用于值 sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?" cursor.execute(sql, (path,)) row = cursor.fetchone() if row: results.append({ 'absolute_path': path, 'id': row[0], 'name': row[1] }) else: print(f"未找到匹配记录:{path}") except Exception as e: print(f"查询失败:{path},错误:{e}") conn.close() return results # if __name__ == "__main__": # test_files = [ # r"Y:\CloudMusic\AGA - MIZU.mp3", # r"Y:\CloudMusic\AGA - 一.mp3", # r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", # r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", # r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", # r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", # r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", # r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" # ] # # result = GetFilesDBPathInfo(files_path=test_files) # for item in result: # print(item) def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node", path_records: list = None) -> list: """ 根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。 :param db_path: 数据库文件路径 :param table_name: db_node 表名 :param path_records: 来自 get_db_path_info 的结果列表 :return: 包含文件分片信息的结果列表 """ conn = sqlite3.connect(db_path) cursor = conn.cursor() results = [] for record in path_records: path_id = record['id'] absolute_path = record['absolute_path'] name = record['name'] try: # 查询 db_node 表中 PathID 对应的记录 cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,)) row = cursor.fetchone() if not row: print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录") continue # 获取字段索引(适用于按列名获取) columns = [desc[0] for desc in cursor.description] # 构建字典以便按列名访问 node_data = dict(zip(columns, row)) # 获取 ExtentCount extent_count = node_data.get("ExtentCount", 0) # 解析分片信息 fragments = [] for i in range(1, 5): # extent1 ~ extent4 loc = node_data.get(f"extent{i}_Location") length = node_data.get(f"extent{i}_Length") if loc is not None and length is not None and length > 0: fragments.append({ "start_byte": loc, "length": length }) results.append({ "absolute_path": absolute_path, "name": name, "path_id": path_id, "extent_count": extent_count, "fragments": fragments }) except Exception as e: print(f"查询失败:PathID={path_id}, 错误:{e}") conn.close() return results # if __name__ == "__main__": # test_files = [ # r"Y:\CloudMusic\AGA - MIZU.mp3", # r"Y:\CloudMusic\AGA - 一.mp3", # r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", # r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", # r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", # r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", # r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", # r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" # ] # # # 第一步:获取 db_path 表中的 ID 和 Name # path_info = GetFilesDBPathInfo(files_path=test_files) # # # 第二步:根据 PathID 查询 db_node 表中的分片信息 # file_extents_info = GetFilesDBNodeInfo(path_records=path_info) # # # 打印结果 # for item in file_extents_info: # print(item) def sort_fragments_by_start_byte(file_extents_list: list) -> list: """ 对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。 :param file_extents_list: get_file_extents_info 返回的结果列表 :return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息 """ all_fragments = [] for file_info in file_extents_list: absolute_path = file_info['absolute_path'] filename = file_info['name'] fragments = file_info['fragments'] # 对当前文件的片段排序(虽然通常已经是有序的) sorted_fragments = sorted(fragments, key=lambda x: x['start_byte']) # 添加片段索引信息 for idx, fragment in enumerate(sorted_fragments, start=1): all_fragments.append({ 'absolute_path': absolute_path, 'filename': filename, 'start_byte': fragment['start_byte'], 'length': fragment['length'], 'fragment_index': idx }) # 全局排序:按 start_byte 排序所有片段 all_fragments.sort(key=lambda x: x['start_byte']) return all_fragments if __name__ == "__main__": test_files = [ r"Y:\CloudMusic\AGA - MIZU.mp3", r"Y:\CloudMusic\AGA - 一.mp3", r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" ] path_info = GetFilesDBPathInfo(files_path=test_files) file_extents_data = GetFilesDBNodeInfo(path_records=path_info) result = sort_fragments_by_start_byte(file_extents_data) for item in result: print(item)