diff --git a/fake_main.py b/fake_main.py new file mode 100644 index 0000000..2b191f0 --- /dev/null +++ b/fake_main.py @@ -0,0 +1,26 @@ +from files_utils.files_list import GetFilesDBNodeInfo, GetFilesDBPathInfo, SortFragmentsByStartByte +from files_utils.files_save import CopyFileFromBytes + + +def GetSortFragments(files_list: list) -> list: + path_info = GetFilesDBPathInfo(db_path="./src/db_ntfs_info.db", table_name="db_path", files_path=files_list) + node_info = GetFilesDBNodeInfo(db_path="./src/db_ntfs_info.db", table_name="db_node", path_records=path_info) + result = SortFragmentsByStartByte(node_info) + return result + + +test_files = [ + r"Y:\CloudMusic\AGA - MIZU.mp3", + r"Y:\CloudMusic\AGA - 一.mp3", + r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", + r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", + r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", + r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", + r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", + r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" +] + +sort_fragments = GetSortFragments(test_files) +for item in sort_fragments: + if item["extent_count"] == 1: + CopyFileFromBytes(item, target_path=r"Z:\test_files") diff --git a/files_utils/files_list.py b/files_utils/files_list.py new file mode 100644 index 0000000..8ee237b --- /dev/null +++ b/files_utils/files_list.py @@ -0,0 +1,160 @@ +import sqlite3 + + +def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db", + table_name: str = "db_path", + files_path=None) -> list: + """ + 根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。 + + :param db_path: 数据库文件路径 + :param table_name: 要查询的数据表名称 + :param files_path: 文件的完整路径列表 + :return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str} + """ + if files_path is None: + files_path = [] + results = [] + + # 连接数据库 + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + for path in files_path: + try: + # 使用字符串格式化插入表名,参数化查询只适用于值 + sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?" + cursor.execute(sql, (path,)) + row = cursor.fetchone() + if row: + results.append({ + 'absolute_path': path, + 'id': row[0], + 'name': row[1] + }) + else: + print(f"未找到匹配记录:{path}") + except Exception as e: + print(f"查询失败:{path},错误:{e}") + + conn.close() + return results + + +def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node", + path_records: list = None) -> list: + """ + 根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。 + + :param db_path: 数据库文件路径 + :param table_name: db_node 表名 + :param path_records: 来自 get_db_path_info 的结果列表 + :return: 包含文件分片信息的结果列表 + """ + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + results = [] + + for record in path_records: + path_id = record['id'] + absolute_path = record['absolute_path'] + name = record['name'] + + try: + # 查询 db_node 表中 PathID 对应的记录 + cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,)) + row = cursor.fetchone() + + if not row: + print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录") + continue + + # 获取字段索引(适用于按列名获取) + columns = [desc[0] for desc in cursor.description] + + # 构建字典以便按列名访问 + node_data = dict(zip(columns, row)) + + # 获取 ExtentCount + extent_count = node_data.get("ExtentCount", 0) + + # 解析分片信息 + fragments = [] + for i in range(1, 5): # extent1 ~ extent4 + loc = node_data.get(f"extent{i}_Location") + length = node_data.get(f"extent{i}_Length") + + if loc is not None and length is not None and length > 0: + fragments.append({ + "start_byte": loc, + "length": length + }) + + results.append({ + "absolute_path": absolute_path, + "name": name, + "path_id": path_id, + "extent_count": extent_count, + "fragments": fragments + }) + + except Exception as e: + print(f"查询失败:PathID={path_id}, 错误:{e}") + + conn.close() + return results + + +def SortFragmentsByStartByte(file_extents_list: list) -> list: + """ + 对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。 + + :param file_extents_list: get_file_extents_info 返回的结果列表 + :return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息 + """ + all_fragments = [] + + for file_info in file_extents_list: + absolute_path = file_info['absolute_path'] + filename = file_info['name'] + extent_count = file_info['extent_count'] + fragments = file_info['fragments'] + + # 对当前文件的片段排序(虽然通常已经是有序的) + sorted_fragments = sorted(fragments, key=lambda x: x['start_byte']) + + # 添加片段索引信息 + for idx, fragment in enumerate(sorted_fragments, start=1): + all_fragments.append({ + 'absolute_path': absolute_path, + 'filename': filename, + 'extent_count': extent_count, + 'start_byte': fragment['start_byte'], + 'length': fragment['length'], + 'fragment_index': idx + }) + + # 全局排序:按 start_byte 排序所有片段 + all_fragments.sort(key=lambda x: x['start_byte']) + + return all_fragments + + +if __name__ == "__main__": + test_files = [ + r"Y:\CloudMusic\AGA - MIZU.mp3", + r"Y:\CloudMusic\AGA - 一.mp3", + r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", + r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", + r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", + r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", + r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", + r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" + ] + path_info = GetFilesDBPathInfo(files_path=test_files) + node_info = GetFilesDBNodeInfo(path_records=path_info) + result = SortFragmentsByStartByte(node_info) + + for item in result: + print(item) diff --git a/files_utils/files_save.py b/files_utils/files_save.py new file mode 100644 index 0000000..36ef801 --- /dev/null +++ b/files_utils/files_save.py @@ -0,0 +1,73 @@ +import os + + +def extract_drive_letter(path: str) -> str: + """从绝对路径中提取盘符""" + drive = os.path.splitdrive(path)[0] + if not drive: + raise ValueError(f"无法从路径中提取盘符:{path}") + return drive[0].upper() # 返回 'Y' + + +def CopyFileFromBytes(source_data_dict, target_path): + """ + 根据起始字节和长度,从磁盘中读取数据并保存为目标文件 + + :param source_data_dict: 包含源数据信息的字典 + :param target_path: 目标文件夹路径 + """ + start_byte = source_data_dict.get("start_byte") + byte_length = source_data_dict.get("length") + absolute_path = source_data_dict.get("absolute_path") + file_name = source_data_dict.get("filename") + + if byte_length <= 0: + print("错误:字节长度无效") + return + + if not absolute_path or not file_name: + print("错误:缺少必要的文件信息") + return + + source_disk_path = extract_drive_letter(absolute_path) + target_file_path = os.path.join(target_path, file_name) + + try: + # 创建目标目录(如果不存在) + os.makedirs(target_path, exist_ok=True) + + with open(fr"\\.\{source_disk_path}:", 'rb') as disk: + disk.seek(start_byte) + + with open(target_file_path, 'wb') as f: + remaining = byte_length + CHUNK_SIZE = 1024 * 1024 # 1MB + while remaining > 0: + read_size = min(CHUNK_SIZE, remaining) + chunk = disk.read(read_size) + if not chunk: + print("警告:读取到空数据,可能已到达磁盘末尾。") + break + f.write(chunk) + remaining -= len(chunk) + + print( + f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}") + + except PermissionError: + print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序") + except Exception as e: + print(f"发生错误: {str(e)}") + + +if __name__ == "__main__": + test_dict = { + 'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3', + 'filename': 'Aaron Zigman - Main Title.mp3', + 'extent_count': 1, + 'start_byte': 687685632, + 'length': 7163904, + 'fragment_index': 1 + } + + CopyFileFromBytes(test_dict, target_path=r"Z:\RecoveredFiles") diff --git a/files_utils/public.py b/files_utils/public.py new file mode 100644 index 0000000..261a24f --- /dev/null +++ b/files_utils/public.py @@ -0,0 +1,37 @@ +def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes: + """ + 从指定磁盘的指定起始位置读取指定长度的字节。 + + :param volume_letter: 盘符(如 "Y") + :param start_byte: 起始字节位置(整数) + :param length: 要读取的字节数(整数) + :return: 读取到的原始字节数据(bytes) + """ + if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1: + raise ValueError("drive_letter 必须是单个字母,如 'Y'") + + # 构建 Windows 设备路径格式:\\.\Y: + disk_path = f"\\\\.\\{volume_letter.strip().upper()}:" + + try: + with open(disk_path, "rb") as disk: + disk.seek(start_byte) + data = disk.read(length) + return data + except PermissionError: + raise PermissionError("权限不足,请以管理员身份运行程序") + except Exception as e: + raise RuntimeError(f"读取磁盘失败:{e}") + + +if __name__ == "__main__": + drive = "Y" + start = 687685632 + size = 7163904 + + try: + content = ReadDiskBytes(drive, start, size) + print(f"成功读取 {len(content)} 字节内容。前100字节为:") + print(content[:100]) + except Exception as e: + print("错误:", e) diff --git a/test/SaveToFile.py b/test/SaveToFile.py deleted file mode 100644 index 8497075..0000000 --- a/test/SaveToFile.py +++ /dev/null @@ -1,47 +0,0 @@ -def copy_file_from_bytes(start_byte, end_byte, source_disk_path, target_file_path): - """ - 根据起始字节和结束字节偏移量,从磁盘中读取指定范围的数据并保存为目标文件 - - 参数: - start_byte (int): 起始字节偏移量(包含) - end_byte (int): 结束字节偏移量(包含) - source_disk_path (str): 源磁盘路径(如 r"\\.\Z:") - target_file_path (str): 目标文件路径(如 r"E:\demo.jpg") - """ - if start_byte > end_byte: - print("错误:起始字节偏移量不能大于结束字节偏移量") - return - - try: - with open(source_disk_path, 'rb') as disk: - # 计算总字节数 - total_bytes = end_byte - start_byte + 1 - - # 定位到起始位置 - disk.seek(start_byte) - - # 读取指定范围内的数据 - file_data = disk.read(total_bytes) - - if not file_data or len(file_data) < total_bytes: - print(f"警告:只读取到 {len(file_data)} 字节,未达到预期 {total_bytes} 字节") - - # 写入目标文件 - with open(target_file_path, 'wb') as f: - f.write(file_data) - - print( - f"成功:已从字节偏移量 {start_byte} 到 {end_byte} 读取 {len(file_data)} 字节,保存为 {target_file_path}") - - except PermissionError: - print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序") - except Exception as e: - print(f"发生错误: {str(e)}") - - -copy_file_from_bytes( - start_byte=687685632, - end_byte=687685632+7163904, - source_disk_path=r"\\.\Y:", - target_file_path=r"Z:\demo.mp3" -) diff --git a/test/files_list.py b/test/files_list.py new file mode 100644 index 0000000..cddb4fa --- /dev/null +++ b/test/files_list.py @@ -0,0 +1,198 @@ +import sqlite3 + + +def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db", + table_name: str = "db_path", + files_path=None) -> list: + """ + 根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。 + + :param db_path: 数据库文件路径 + :param table_name: 要查询的数据表名称 + :param files_path: 文件的完整路径列表 + :return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str} + """ + if files_path is None: + file_paths = [] + results = [] + + # 连接数据库 + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + for path in files_path: + try: + # 使用字符串格式化插入表名,参数化查询只适用于值 + sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?" + cursor.execute(sql, (path,)) + row = cursor.fetchone() + if row: + results.append({ + 'absolute_path': path, + 'id': row[0], + 'name': row[1] + }) + else: + print(f"未找到匹配记录:{path}") + except Exception as e: + print(f"查询失败:{path},错误:{e}") + + conn.close() + return results + + +# if __name__ == "__main__": +# test_files = [ +# r"Y:\CloudMusic\AGA - MIZU.mp3", +# r"Y:\CloudMusic\AGA - 一.mp3", +# r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", +# r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", +# r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", +# r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", +# r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", +# r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" +# ] +# +# result = GetFilesDBPathInfo(files_path=test_files) +# for item in result: +# print(item) + + +def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node", + path_records: list = None) -> list: + """ + 根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。 + + :param db_path: 数据库文件路径 + :param table_name: db_node 表名 + :param path_records: 来自 get_db_path_info 的结果列表 + :return: 包含文件分片信息的结果列表 + """ + conn = sqlite3.connect(db_path) + cursor = conn.cursor() + + results = [] + + for record in path_records: + path_id = record['id'] + absolute_path = record['absolute_path'] + name = record['name'] + + try: + # 查询 db_node 表中 PathID 对应的记录 + cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,)) + row = cursor.fetchone() + + if not row: + print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录") + continue + + # 获取字段索引(适用于按列名获取) + columns = [desc[0] for desc in cursor.description] + + # 构建字典以便按列名访问 + node_data = dict(zip(columns, row)) + + # 获取 ExtentCount + extent_count = node_data.get("ExtentCount", 0) + + # 解析分片信息 + fragments = [] + for i in range(1, 5): # extent1 ~ extent4 + loc = node_data.get(f"extent{i}_Location") + length = node_data.get(f"extent{i}_Length") + + if loc is not None and length is not None and length > 0: + fragments.append({ + "start_byte": loc, + "length": length + }) + + results.append({ + "absolute_path": absolute_path, + "name": name, + "path_id": path_id, + "extent_count": extent_count, + "fragments": fragments + }) + + except Exception as e: + print(f"查询失败:PathID={path_id}, 错误:{e}") + + conn.close() + return results + + +# if __name__ == "__main__": +# test_files = [ +# r"Y:\CloudMusic\AGA - MIZU.mp3", +# r"Y:\CloudMusic\AGA - 一.mp3", +# r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", +# r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", +# r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", +# r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", +# r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", +# r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" +# ] +# +# # 第一步:获取 db_path 表中的 ID 和 Name +# path_info = GetFilesDBPathInfo(files_path=test_files) +# +# # 第二步:根据 PathID 查询 db_node 表中的分片信息 +# file_extents_info = GetFilesDBNodeInfo(path_records=path_info) +# +# # 打印结果 +# for item in file_extents_info: +# print(item) + + +def sort_fragments_by_start_byte(file_extents_list: list) -> list: + """ + 对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。 + + :param file_extents_list: get_file_extents_info 返回的结果列表 + :return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息 + """ + all_fragments = [] + + for file_info in file_extents_list: + absolute_path = file_info['absolute_path'] + filename = file_info['name'] + fragments = file_info['fragments'] + + # 对当前文件的片段排序(虽然通常已经是有序的) + sorted_fragments = sorted(fragments, key=lambda x: x['start_byte']) + + # 添加片段索引信息 + for idx, fragment in enumerate(sorted_fragments, start=1): + all_fragments.append({ + 'absolute_path': absolute_path, + 'filename': filename, + 'start_byte': fragment['start_byte'], + 'length': fragment['length'], + 'fragment_index': idx + }) + + # 全局排序:按 start_byte 排序所有片段 + all_fragments.sort(key=lambda x: x['start_byte']) + + return all_fragments + + +if __name__ == "__main__": + test_files = [ + r"Y:\CloudMusic\AGA - MIZU.mp3", + r"Y:\CloudMusic\AGA - 一.mp3", + r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", + r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", + r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", + r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", + r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", + r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" + ] + path_info = GetFilesDBPathInfo(files_path=test_files) + file_extents_data = GetFilesDBNodeInfo(path_records=path_info) + result = sort_fragments_by_start_byte(file_extents_data) + + for item in result: + print(item) diff --git a/test/files_save.py b/test/files_save.py new file mode 100644 index 0000000..f6bbd0d --- /dev/null +++ b/test/files_save.py @@ -0,0 +1,72 @@ +import os + + +def extract_drive_letter(path: str) -> str: + """从绝对路径中提取盘符""" + drive = os.path.splitdrive(path)[0] + if not drive: + raise ValueError(f"无法从路径中提取盘符:{path}") + return drive[0].upper() # 返回 'Y' + + +def CopyFileFromBytes(source_data_dict, target_path): + """ + 根据起始字节和长度,从磁盘中读取数据并保存为目标文件 + + :param source_data_dict: 包含源数据信息的字典 + :param target_path: 目标文件夹路径 + """ + start_byte = source_data_dict.get("start_byte") + byte_length = source_data_dict.get("length") + absolute_path = source_data_dict.get("absolute_path") + file_name = source_data_dict.get("filename") + + if byte_length <= 0: + print("错误:字节长度无效") + return + + if not absolute_path or not file_name: + print("错误:缺少必要的文件信息") + return + + source_disk_path = extract_drive_letter(absolute_path) + target_file_path = os.path.join(target_path, file_name) + + try: + # 创建目标目录(如果不存在) + os.makedirs(target_path, exist_ok=True) + + with open(fr"\\.\{source_disk_path}:", 'rb') as disk: + disk.seek(start_byte) + + with open(target_file_path, 'wb') as f: + remaining = byte_length + CHUNK_SIZE = 1024 * 1024 # 1MB + while remaining > 0: + read_size = min(CHUNK_SIZE, remaining) + chunk = disk.read(read_size) + if not chunk: + print("警告:读取到空数据,可能已到达磁盘末尾。") + break + f.write(chunk) + remaining -= len(chunk) + + print( + f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}") + + except PermissionError: + print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序") + except Exception as e: + print(f"发生错误: {str(e)}") + + +test_dict = { + 'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3', + 'filename': 'Aaron Zigman - Main Title.mp3', + 'extent_count': 1, + 'start_byte': 687685632, + 'length': 7163904, + 'fragment_index': 1 +} + +CopyFileFromBytes(test_dict, target_path=r"Z:\RecoveredFiles")