import os def ExtractVolumeLetter(path: str) -> str: """从绝对路径中提取盘符""" drive = os.path.splitdrive(path)[0] if not drive: raise ValueError(f"无法从路径中提取盘符:{path}") return drive[0].upper() # 返回 'Y' def CopySingleFragmentFiles(source_data_dict, target_path): """ 根据起始字节和长度,从磁盘中读取数据并保存为目标文件 :param source_data_dict: 包含源数据信息的字典 :param target_path: 目标文件夹路径 """ start_byte = source_data_dict.get("start_byte") byte_length = source_data_dict.get("length") absolute_path = source_data_dict.get("absolute_path") file_name = source_data_dict.get("filename") if byte_length <= 0: print("错误:字节长度无效") return if not absolute_path or not file_name: print("错误:缺少必要的文件信息") return source_disk_path = ExtractVolumeLetter(absolute_path) target_file_path = os.path.join(target_path, file_name) try: # 创建目标目录(如果不存在) os.makedirs(target_path, exist_ok=True) with open(fr"\\.\{source_disk_path}:", 'rb') as disk: disk.seek(start_byte) with open(target_file_path, 'wb') as f: remaining = byte_length CHUNK_SIZE = 1024 * 1024 # 1MB while remaining > 0: read_size = min(CHUNK_SIZE, remaining) chunk = disk.read(read_size) if not chunk: print("警告:读取到空数据,可能已到达磁盘末尾。") break f.write(chunk) remaining -= len(chunk) print( f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}") except PermissionError: print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序") except Exception as e: print(f"发生错误: {str(e)}") def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes: """ 从指定磁盘的指定起始位置读取指定长度的字节。 :param volume_letter: 盘符(如 "Y") :param start_byte: 起始字节位置(整数) :param length: 要读取的字节数(整数) :return: 读取到的原始字节数据(bytes) """ if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1: raise ValueError("drive_letter 必须是单个字母,如 'Y'") # 构建 Windows 设备路径格式:\\.\Y: disk_path = f"\\\\.\\{volume_letter.strip().upper()}:" try: with open(disk_path, "rb") as disk: disk.seek(start_byte) data = disk.read(length) return data except PermissionError: raise PermissionError("权限不足,请以管理员身份运行程序") except Exception as e: raise RuntimeError(f"读取磁盘失败:{e}") def CopyMultiFragmentFiles( item: dict, fragment_lists: dict, target_path: str ): """ 处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。 :param item: 包含文件分片信息的字典 :param fragment_lists: 存储各文件分片内容的字典 :param target_path: 恢复文件的目标保存路径 :return: None """ file_name = item['filename'] extent_count = item['extent_count'] fragment_index = item['fragment_index'] start_byte = item['start_byte'] length_byte = item['length'] volume_letter = ExtractVolumeLetter(item['absolute_path']) # 读取分片内容 fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte) # 如果还没有为这个文件创建列表,则初始化 if file_name not in fragment_lists: fragment_lists[file_name] = [None] * extent_count # 将内容插入到指定位置 if fragment_index <= extent_count: fragment_lists[file_name][fragment_index - 1] = fragment_content print(f"已写入 {file_name} 的第 {fragment_index} 个片段。") else: print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}") # 检查是否所有分片都已加载 fragments = fragment_lists[file_name] if None not in fragments: full_content = b''.join(fragments) target_file_path = os.path.join(target_path, file_name) try: with open(target_file_path, 'wb') as f: f.write(full_content) print(f"已成功恢复文件:{file_name}") except Exception as e: print(f"写入文件失败:{file_name},错误:{e}")