diff --git a/fake_main.py b/fake_main.py index 2b191f0..b3a6895 100644 --- a/fake_main.py +++ b/fake_main.py @@ -1,14 +1,7 @@ -from files_utils.files_list import GetFilesDBNodeInfo, GetFilesDBPathInfo, SortFragmentsByStartByte -from files_utils.files_save import CopyFileFromBytes - - -def GetSortFragments(files_list: list) -> list: - path_info = GetFilesDBPathInfo(db_path="./src/db_ntfs_info.db", table_name="db_path", files_path=files_list) - node_info = GetFilesDBNodeInfo(db_path="./src/db_ntfs_info.db", table_name="db_node", path_records=path_info) - result = SortFragmentsByStartByte(node_info) - return result - +from files_utils.files_save import CopySingleFragmentFiles, CopyMultiFragmentFiles +from files_utils.files_sort import GetSortFragments +fragment_lists = {} test_files = [ r"Y:\CloudMusic\AGA - MIZU.mp3", r"Y:\CloudMusic\AGA - 一.mp3", @@ -20,7 +13,10 @@ test_files = [ r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" ] -sort_fragments = GetSortFragments(test_files) +sort_fragments = GetSortFragments(db_path="./src/db_ntfs_info.db", files_list=test_files) for item in sort_fragments: - if item["extent_count"] == 1: - CopyFileFromBytes(item, target_path=r"Z:\test_files") + extent_count = item['extent_count'] + if extent_count == 1: + CopySingleFragmentFiles(item, target_path=r"Z:\test_files") + elif extent_count > 1: + CopyMultiFragmentFiles(item, fragment_lists=fragment_lists, target_path=r"Z:\test_files") diff --git a/files_utils/files_save.py b/files_utils/files_save.py index 36ef801..57b4d76 100644 --- a/files_utils/files_save.py +++ b/files_utils/files_save.py @@ -1,7 +1,7 @@ import os -def extract_drive_letter(path: str) -> str: +def ExtractVolumeLetter(path: str) -> str: """从绝对路径中提取盘符""" drive = os.path.splitdrive(path)[0] if not drive: @@ -9,7 +9,7 @@ def extract_drive_letter(path: str) -> str: return drive[0].upper() # 返回 'Y' -def CopyFileFromBytes(source_data_dict, target_path): +def CopySingleFragmentFiles(source_data_dict, target_path): """ 根据起始字节和长度,从磁盘中读取数据并保存为目标文件 @@ -29,7 +29,7 @@ def CopyFileFromBytes(source_data_dict, target_path): print("错误:缺少必要的文件信息") return - source_disk_path = extract_drive_letter(absolute_path) + source_disk_path = ExtractVolumeLetter(absolute_path) target_file_path = os.path.join(target_path, file_name) try: @@ -60,14 +60,75 @@ def CopyFileFromBytes(source_data_dict, target_path): print(f"发生错误: {str(e)}") -if __name__ == "__main__": - test_dict = { - 'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3', - 'filename': 'Aaron Zigman - Main Title.mp3', - 'extent_count': 1, - 'start_byte': 687685632, - 'length': 7163904, - 'fragment_index': 1 - } +def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes: + """ + 从指定磁盘的指定起始位置读取指定长度的字节。 - CopyFileFromBytes(test_dict, target_path=r"Z:\RecoveredFiles") + :param volume_letter: 盘符(如 "Y") + :param start_byte: 起始字节位置(整数) + :param length: 要读取的字节数(整数) + :return: 读取到的原始字节数据(bytes) + """ + if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1: + raise ValueError("drive_letter 必须是单个字母,如 'Y'") + + # 构建 Windows 设备路径格式:\\.\Y: + disk_path = f"\\\\.\\{volume_letter.strip().upper()}:" + + try: + with open(disk_path, "rb") as disk: + disk.seek(start_byte) + data = disk.read(length) + return data + except PermissionError: + raise PermissionError("权限不足,请以管理员身份运行程序") + except Exception as e: + raise RuntimeError(f"读取磁盘失败:{e}") + + +def CopyMultiFragmentFiles( + item: dict, + fragment_lists: dict, + target_path: str +): + """ + 处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。 + + :param item: 包含文件分片信息的字典 + :param fragment_lists: 存储各文件分片内容的字典 + :param target_path: 恢复文件的目标保存路径 + :return: None + """ + file_name = item['filename'] + extent_count = item['extent_count'] + fragment_index = item['fragment_index'] + start_byte = item['start_byte'] + length_byte = item['length'] + + volume_letter = ExtractVolumeLetter(item['absolute_path']) + + # 读取分片内容 + fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte) + + # 如果还没有为这个文件创建列表,则初始化 + if file_name not in fragment_lists: + fragment_lists[file_name] = [None] * extent_count + + # 将内容插入到指定位置 + if fragment_index <= extent_count: + fragment_lists[file_name][fragment_index - 1] = fragment_content + print(f"已写入 {file_name} 的第 {fragment_index} 个片段。") + else: + print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}") + + # 检查是否所有分片都已加载 + fragments = fragment_lists[file_name] + if None not in fragments: + full_content = b''.join(fragments) + target_file_path = os.path.join(target_path, file_name) + try: + with open(target_file_path, 'wb') as f: + f.write(full_content) + print(f"已成功恢复文件:{file_name}") + except Exception as e: + print(f"写入文件失败:{file_name},错误:{e}") diff --git a/files_utils/files_list.py b/files_utils/files_sort.py similarity index 86% rename from files_utils/files_list.py rename to files_utils/files_sort.py index 8ee237b..4b2d695 100644 --- a/files_utils/files_list.py +++ b/files_utils/files_sort.py @@ -141,20 +141,8 @@ def SortFragmentsByStartByte(file_extents_list: list) -> list: return all_fragments -if __name__ == "__main__": - test_files = [ - r"Y:\CloudMusic\AGA - MIZU.mp3", - r"Y:\CloudMusic\AGA - 一.mp3", - r"Y:\CloudMusic\Aaron Zigman - Main Title.mp3", - r"Y:\CloudMusic\Anson Seabra - Keep Your Head Up Princess.mp3", - r"Y:\CloudMusic\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3", - r"Y:\CloudMusic\Ava Max - Sweet but Psycho.mp3", - r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", - r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" - ] - path_info = GetFilesDBPathInfo(files_path=test_files) - node_info = GetFilesDBNodeInfo(path_records=path_info) +def GetSortFragments(db_path: str = "../src/db_ntfs_info.db", files_list: list = None) -> list: + path_info = GetFilesDBPathInfo(db_path=db_path, table_name="db_path", files_path=files_list) + node_info = GetFilesDBNodeInfo(db_path=db_path, table_name="db_node", path_records=path_info) result = SortFragmentsByStartByte(node_info) - - for item in result: - print(item) + return result diff --git a/files_utils/public.py b/files_utils/public.py deleted file mode 100644 index 261a24f..0000000 --- a/files_utils/public.py +++ /dev/null @@ -1,37 +0,0 @@ -def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes: - """ - 从指定磁盘的指定起始位置读取指定长度的字节。 - - :param volume_letter: 盘符(如 "Y") - :param start_byte: 起始字节位置(整数) - :param length: 要读取的字节数(整数) - :return: 读取到的原始字节数据(bytes) - """ - if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1: - raise ValueError("drive_letter 必须是单个字母,如 'Y'") - - # 构建 Windows 设备路径格式:\\.\Y: - disk_path = f"\\\\.\\{volume_letter.strip().upper()}:" - - try: - with open(disk_path, "rb") as disk: - disk.seek(start_byte) - data = disk.read(length) - return data - except PermissionError: - raise PermissionError("权限不足,请以管理员身份运行程序") - except Exception as e: - raise RuntimeError(f"读取磁盘失败:{e}") - - -if __name__ == "__main__": - drive = "Y" - start = 687685632 - size = 7163904 - - try: - content = ReadDiskBytes(drive, start, size) - print(f"成功读取 {len(content)} 字节内容。前100字节为:") - print(content[:100]) - except Exception as e: - print("错误:", e) diff --git a/test/fake_main.py b/test/fake_main.py new file mode 100644 index 0000000..cee78aa --- /dev/null +++ b/test/fake_main.py @@ -0,0 +1,36 @@ +from files_save import CopyMultiFragmentFiles, CopySingleFragmentFiles + +target_path = r"Z:\Recovered" +# 存储各个文件的分片内容列表 +fragment_lists = {} +test_file_sort = [{'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'filename': 'AGA - 一.mp3', 'extent_count': 2, + 'start_byte': 23162880, 'length': 69632, 'fragment_index': 1}, + {'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3', + 'filename': 'Aaron Zigman - Main Title.mp3', 'extent_count': 1, 'start_byte': 687685632, + 'length': 7163904, 'fragment_index': 1}, + {'absolute_path': 'Y:\\CloudMusic\\AGA - MIZU.mp3', 'filename': 'AGA - MIZU.mp3', 'extent_count': 1, + 'start_byte': 694849536, 'length': 8126464, 'fragment_index': 1}, + {'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'filename': 'AGA - 一.mp3', 'extent_count': 2, + 'start_byte': 702976000, 'length': 10870784, 'fragment_index': 2}, + {'absolute_path': 'Y:\\CloudMusic\\Anson Seabra - Keep Your Head Up Princess.mp3', + 'filename': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'extent_count': 1, + 'start_byte': 713846784, 'length': 7970816, 'fragment_index': 1}, { + 'absolute_path': 'Y:\\CloudMusic\\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', + 'filename': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', + 'extent_count': 1, 'start_byte': 721817600, 'length': 9179136, 'fragment_index': 1}, + {'absolute_path': 'Y:\\CloudMusic\\Ava Max - Sweet but Psycho.mp3', + 'filename': 'Ava Max - Sweet but Psycho.mp3', 'extent_count': 1, 'start_byte': 731000832, + 'length': 7938048, 'fragment_index': 1}, + {'absolute_path': 'Y:\\CloudMusic\\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', + 'filename': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'extent_count': 1, + 'start_byte': 738938880, 'length': 6791168, 'fragment_index': 1}, + {'absolute_path': 'Y:\\CloudMusic\\Color Music Choir - Something Just Like This (Live).mp3', + 'filename': 'Color Music Choir - Something Just Like This (Live).mp3', 'extent_count': 1, + 'start_byte': 745730048, 'length': 6193152, 'fragment_index': 1}] + +for item in test_file_sort: + extent_count = item['extent_count'] + if extent_count == 1: + CopySingleFragmentFiles(item, target_path) + elif extent_count > 1: + CopyMultiFragmentFiles(item, fragment_lists, target_path) diff --git a/test/files_save.py b/test/files_save.py index f6bbd0d..0c60797 100644 --- a/test/files_save.py +++ b/test/files_save.py @@ -1,7 +1,7 @@ import os -def extract_drive_letter(path: str) -> str: +def ExtractVolumeLetter(path: str) -> str: """从绝对路径中提取盘符""" drive = os.path.splitdrive(path)[0] if not drive: @@ -9,7 +9,7 @@ def extract_drive_letter(path: str) -> str: return drive[0].upper() # 返回 'Y' -def CopyFileFromBytes(source_data_dict, target_path): +def CopySingleFragmentFiles(source_data_dict, target_path): """ 根据起始字节和长度,从磁盘中读取数据并保存为目标文件 @@ -29,7 +29,7 @@ def CopyFileFromBytes(source_data_dict, target_path): print("错误:缺少必要的文件信息") return - source_disk_path = extract_drive_letter(absolute_path) + source_disk_path = ExtractVolumeLetter(absolute_path) target_file_path = os.path.join(target_path, file_name) try: @@ -60,13 +60,101 @@ def CopyFileFromBytes(source_data_dict, target_path): print(f"发生错误: {str(e)}") -test_dict = { - 'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3', - 'filename': 'Aaron Zigman - Main Title.mp3', - 'extent_count': 1, - 'start_byte': 687685632, - 'length': 7163904, - 'fragment_index': 1 -} +def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes: + """ + 从指定磁盘的指定起始位置读取指定长度的字节。 -CopyFileFromBytes(test_dict, target_path=r"Z:\RecoveredFiles") + :param volume_letter: 盘符(如 "Y") + :param start_byte: 起始字节位置(整数) + :param length: 要读取的字节数(整数) + :return: 读取到的原始字节数据(bytes) + """ + if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1: + raise ValueError("drive_letter 必须是单个字母,如 'Y'") + + # 构建 Windows 设备路径格式:\\.\Y: + disk_path = f"\\\\.\\{volume_letter.strip().upper()}:" + + try: + with open(disk_path, "rb") as disk: + disk.seek(start_byte) + data = disk.read(length) + return data + except PermissionError: + raise PermissionError("权限不足,请以管理员身份运行程序") + except Exception as e: + raise RuntimeError(f"读取磁盘失败:{e}") + + +# if __name__ == "__main__": +# drive = "Y" +# start = 687685632 +# size = 7163904 +# +# try: +# content = ReadDiskBytes(drive, start, size) +# print(f"成功读取 {len(content)} 字节内容。前100字节为:") +# print(content[:100]) +# except Exception as e: +# print("错误:", e) + + +def CopyMultiFragmentFiles( + item: dict, + fragment_lists: dict, + target_path: str +): + """ + 处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。 + + :param item: 包含文件分片信息的字典 + :param fragment_lists: 存储各文件分片内容的字典 + :param target_path: 恢复文件的目标保存路径 + :return: None + """ + file_name = item['filename'] + extent_count = item['extent_count'] + fragment_index = item['fragment_index'] + start_byte = item['start_byte'] + length_byte = item['length'] + + volume_letter = ExtractVolumeLetter(item['absolute_path']) + + # 读取分片内容 + fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte) + + # 如果还没有为这个文件创建列表,则初始化 + if file_name not in fragment_lists: + fragment_lists[file_name] = [None] * extent_count + + # 将内容插入到指定位置 + if fragment_index <= extent_count: + fragment_lists[file_name][fragment_index - 1] = fragment_content + print(f"已写入 {file_name} 的第 {fragment_index} 个片段。") + else: + print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}") + + # 检查是否所有分片都已加载 + fragments = fragment_lists[file_name] + if None not in fragments: + full_content = b''.join(fragments) + target_file_path = os.path.join(target_path, file_name) + try: + with open(target_file_path, 'wb') as f: + f.write(full_content) + print(f"已成功恢复文件:{file_name}") + except Exception as e: + print(f"写入文件失败:{file_name},错误:{e}") + + +if __name__ == "__main__": + test_dict = { + 'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3', + 'filename': 'Aaron Zigman - Main Title.mp3', + 'extent_count': 1, + 'start_byte': 687685632, + 'length': 7163904, + 'fragment_index': 1 + } + + CopySingleFragmentFiles(test_dict, target_path=r"Z:\RecoveredFiles") diff --git a/test/files_list.py b/test/files_sort.py similarity index 74% rename from test/files_list.py rename to test/files_sort.py index cddb4fa..85d4210 100644 --- a/test/files_list.py +++ b/test/files_sort.py @@ -158,6 +158,7 @@ def sort_fragments_by_start_byte(file_extents_list: list) -> list: for file_info in file_extents_list: absolute_path = file_info['absolute_path'] filename = file_info['name'] + extent_count = file_info['extent_count'] fragments = file_info['fragments'] # 对当前文件的片段排序(虽然通常已经是有序的) @@ -168,6 +169,7 @@ def sort_fragments_by_start_byte(file_extents_list: list) -> list: all_fragments.append({ 'absolute_path': absolute_path, 'filename': filename, + 'extent_count': extent_count, 'start_byte': fragment['start_byte'], 'length': fragment['length'], 'fragment_index': idx @@ -190,9 +192,31 @@ if __name__ == "__main__": r"Y:\CloudMusic\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3", r"Y:\CloudMusic\Color Music Choir - Something Just Like This (Live).mp3" ] - path_info = GetFilesDBPathInfo(files_path=test_files) - file_extents_data = GetFilesDBNodeInfo(path_records=path_info) - result = sort_fragments_by_start_byte(file_extents_data) + test_files_sort = [ + {'absolute_path': 'Y:\\CloudMusic\\AGA - MIZU.mp3', 'name': 'AGA - MIZU.mp3', 'path_id': 6, 'extent_count': 1, + 'fragments': [{'start_byte': 694849536, 'length': 8126464}]}, + {'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'name': 'AGA - 一.mp3', 'path_id': 7, 'extent_count': 2, + 'fragments': [{'start_byte': 702976000, 'length': 10870784}, {'start_byte': 23162880, 'length': 69632}]}, + {'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3', 'name': 'Aaron Zigman - Main Title.mp3', + 'path_id': 5, 'extent_count': 1, 'fragments': [{'start_byte': 687685632, 'length': 7163904}]}, + {'absolute_path': 'Y:\\CloudMusic\\Anson Seabra - Keep Your Head Up Princess.mp3', + 'name': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'path_id': 8, 'extent_count': 1, + 'fragments': [{'start_byte': 713846784, 'length': 7970816}]}, + {'absolute_path': 'Y:\\CloudMusic\\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', + 'name': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', 'path_id': 9, + 'extent_count': 1, 'fragments': [{'start_byte': 721817600, 'length': 9179136}]}, + {'absolute_path': 'Y:\\CloudMusic\\Ava Max - Sweet but Psycho.mp3', 'name': 'Ava Max - Sweet but Psycho.mp3', + 'path_id': 10, 'extent_count': 1, 'fragments': [{'start_byte': 731000832, 'length': 7938048}]}, + {'absolute_path': 'Y:\\CloudMusic\\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', + 'name': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'path_id': 11, 'extent_count': 1, + 'fragments': [{'start_byte': 738938880, 'length': 6791168}]}, + {'absolute_path': 'Y:\\CloudMusic\\Color Music Choir - Something Just Like This (Live).mp3', + 'name': 'Color Music Choir - Something Just Like This (Live).mp3', 'path_id': 12, 'extent_count': 1, + 'fragments': [{'start_byte': 745730048, 'length': 6193152}]}] - for item in result: - print(item) + # path_info = GetFilesDBPathInfo(files_path=test_files) + # file_extents_data = GetFilesDBNodeInfo(path_records=path_info) + result = sort_fragments_by_start_byte(test_files_sort) + print(result) + # for item in result: + # print(item) diff --git a/main.py b/test/folders_save.py similarity index 100% rename from main.py rename to test/folders_save.py