132 lines
4.6 KiB
Python
132 lines
4.6 KiB
Python
import os
|
||
|
||
|
||
def GetVolumeLetter() -> str:
|
||
from ntfs_utils.main import volume_letter
|
||
return volume_letter
|
||
|
||
|
||
def CopySingleFragmentFiles(source_data_dict, target_path):
|
||
"""
|
||
根据起始字节和长度,从磁盘中读取数据并保存为目标文件
|
||
|
||
:param source_data_dict: 包含源数据信息的字典
|
||
:param target_path: 目标文件夹路径
|
||
"""
|
||
start_byte = source_data_dict.get("start_byte")
|
||
byte_length = source_data_dict.get("length")
|
||
absolute_path = source_data_dict.get("absolute_path")
|
||
file_name = source_data_dict.get("filename")
|
||
|
||
if byte_length <= 0:
|
||
print("错误:字节长度无效")
|
||
return
|
||
|
||
if not absolute_path or not file_name:
|
||
print("错误:缺少必要的文件信息")
|
||
return
|
||
|
||
source_disk_path = GetVolumeLetter()
|
||
target_file_path = os.path.join(target_path, file_name)
|
||
|
||
try:
|
||
# 创建目标目录(如果不存在)
|
||
os.makedirs(target_path, exist_ok=True)
|
||
|
||
with open(fr"\\.\{source_disk_path}:", 'rb') as disk:
|
||
disk.seek(start_byte)
|
||
|
||
with open(target_file_path, 'wb') as f:
|
||
remaining = byte_length
|
||
CHUNK_SIZE = 1024 * 1024 # 1MB
|
||
while remaining > 0:
|
||
read_size = min(CHUNK_SIZE, remaining)
|
||
chunk = disk.read(read_size)
|
||
if not chunk:
|
||
print("警告:读取到空数据,可能已到达磁盘末尾。")
|
||
break
|
||
f.write(chunk)
|
||
remaining -= len(chunk)
|
||
|
||
print(
|
||
f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}")
|
||
|
||
except PermissionError:
|
||
print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序")
|
||
except Exception as e:
|
||
print(f"发生错误: {str(e)}")
|
||
|
||
|
||
def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes:
|
||
"""
|
||
从指定磁盘的指定起始位置读取指定长度的字节。
|
||
|
||
:param volume_letter: 盘符(如 "Y")
|
||
:param start_byte: 起始字节位置(整数)
|
||
:param length: 要读取的字节数(整数)
|
||
:return: 读取到的原始字节数据(bytes)
|
||
"""
|
||
if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1:
|
||
raise ValueError("drive_letter 必须是单个字母,如 'Y'")
|
||
|
||
# 构建 Windows 设备路径格式:\\.\Y:
|
||
disk_path = f"\\\\.\\{volume_letter.strip().upper()}:"
|
||
|
||
try:
|
||
with open(disk_path, "rb") as disk:
|
||
disk.seek(start_byte)
|
||
data = disk.read(length)
|
||
return data
|
||
except PermissionError:
|
||
raise PermissionError("权限不足,请以管理员身份运行程序")
|
||
except Exception as e:
|
||
raise RuntimeError(f"读取磁盘失败:{e}")
|
||
|
||
|
||
def CopyMultiFragmentFiles(
|
||
item: dict,
|
||
fragment_lists: dict,
|
||
target_path: str
|
||
):
|
||
"""
|
||
处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。
|
||
|
||
:param item: 包含文件分片信息的字典
|
||
:param fragment_lists: 存储各文件分片内容的字典
|
||
:param target_path: 恢复文件的目标保存路径
|
||
:return: None
|
||
"""
|
||
file_name = item['filename']
|
||
extent_count = item['extent_count']
|
||
fragment_index = item['fragment_index']
|
||
start_byte = item['start_byte']
|
||
length_byte = item['length']
|
||
|
||
volume_letter = GetVolumeLetter()
|
||
|
||
# 读取分片内容
|
||
fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte)
|
||
|
||
# 如果还没有为这个文件创建列表,则初始化
|
||
if file_name not in fragment_lists:
|
||
fragment_lists[file_name] = [None] * extent_count
|
||
|
||
# 将内容插入到指定位置
|
||
if fragment_index <= extent_count:
|
||
fragment_lists[file_name][fragment_index - 1] = fragment_content
|
||
print(f"已写入 {file_name} 的第 {fragment_index} 个片段。")
|
||
else:
|
||
print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}")
|
||
|
||
# 检查是否所有分片都已加载
|
||
fragments = fragment_lists[file_name]
|
||
if None not in fragments:
|
||
full_content = b''.join(fragments)
|
||
target_file_path = os.path.join(target_path, file_name)
|
||
try:
|
||
with open(target_file_path, 'wb') as f:
|
||
f.write(full_content)
|
||
print(f"已成功恢复文件:{file_name}")
|
||
except Exception as e:
|
||
print(f"写入文件失败:{file_name},错误:{e}")
|