Files
fastcopy/test/files_save.py
2025-05-22 13:03:09 +08:00

161 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
def ExtractVolumeLetter(path: str) -> str:
"""从绝对路径中提取盘符"""
drive = os.path.splitdrive(path)[0]
if not drive:
raise ValueError(f"无法从路径中提取盘符:{path}")
return drive[0].upper() # 返回 'Y'
def CopySingleFragmentFiles(source_data_dict, target_path):
"""
根据起始字节和长度,从磁盘中读取数据并保存为目标文件
:param source_data_dict: 包含源数据信息的字典
:param target_path: 目标文件夹路径
"""
start_byte = source_data_dict.get("start_byte")
byte_length = source_data_dict.get("length")
absolute_path = source_data_dict.get("absolute_path")
file_name = source_data_dict.get("filename")
if byte_length <= 0:
print("错误:字节长度无效")
return
if not absolute_path or not file_name:
print("错误:缺少必要的文件信息")
return
source_disk_path = ExtractVolumeLetter(absolute_path)
target_file_path = os.path.join(target_path, file_name)
try:
# 创建目标目录(如果不存在)
os.makedirs(target_path, exist_ok=True)
with open(fr"\\.\{source_disk_path}:", 'rb') as disk:
disk.seek(start_byte)
with open(target_file_path, 'wb') as f:
remaining = byte_length
CHUNK_SIZE = 1024 * 1024 # 1MB
while remaining > 0:
read_size = min(CHUNK_SIZE, remaining)
chunk = disk.read(read_size)
if not chunk:
print("警告:读取到空数据,可能已到达磁盘末尾。")
break
f.write(chunk)
remaining -= len(chunk)
print(
f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}")
except PermissionError:
print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序")
except Exception as e:
print(f"发生错误: {str(e)}")
def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes:
"""
从指定磁盘的指定起始位置读取指定长度的字节。
:param volume_letter: 盘符(如 "Y"
:param start_byte: 起始字节位置(整数)
:param length: 要读取的字节数(整数)
:return: 读取到的原始字节数据bytes
"""
if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1:
raise ValueError("drive_letter 必须是单个字母,如 'Y'")
# 构建 Windows 设备路径格式:\\.\Y:
disk_path = f"\\\\.\\{volume_letter.strip().upper()}:"
try:
with open(disk_path, "rb") as disk:
disk.seek(start_byte)
data = disk.read(length)
return data
except PermissionError:
raise PermissionError("权限不足,请以管理员身份运行程序")
except Exception as e:
raise RuntimeError(f"读取磁盘失败:{e}")
# if __name__ == "__main__":
# drive = "Y"
# start = 687685632
# size = 7163904
#
# try:
# content = ReadDiskBytes(drive, start, size)
# print(f"成功读取 {len(content)} 字节内容。前100字节为")
# print(content[:100])
# except Exception as e:
# print("错误:", e)
def CopyMultiFragmentFiles(
item: dict,
fragment_lists: dict,
target_path: str
):
"""
处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。
:param item: 包含文件分片信息的字典
:param fragment_lists: 存储各文件分片内容的字典
:param target_path: 恢复文件的目标保存路径
:return: None
"""
file_name = item['filename']
extent_count = item['extent_count']
fragment_index = item['fragment_index']
start_byte = item['start_byte']
length_byte = item['length']
volume_letter = ExtractVolumeLetter(item['absolute_path'])
# 读取分片内容
fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte)
# 如果还没有为这个文件创建列表,则初始化
if file_name not in fragment_lists:
fragment_lists[file_name] = [None] * extent_count
# 将内容插入到指定位置
if fragment_index <= extent_count:
fragment_lists[file_name][fragment_index - 1] = fragment_content
print(f"已写入 {file_name} 的第 {fragment_index} 个片段。")
else:
print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}")
# 检查是否所有分片都已加载
fragments = fragment_lists[file_name]
if None not in fragments:
full_content = b''.join(fragments)
target_file_path = os.path.join(target_path, file_name)
try:
with open(target_file_path, 'wb') as f:
f.write(full_content)
print(f"已成功恢复文件:{file_name}")
except Exception as e:
print(f"写入文件失败:{file_name},错误:{e}")
if __name__ == "__main__":
test_dict = {
'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3',
'filename': 'Aaron Zigman - Main Title.mp3',
'extent_count': 1,
'start_byte': 687685632,
'length': 7163904,
'fragment_index': 1
}
CopySingleFragmentFiles(test_dict, target_path=r"Z:\RecoveredFiles")