Compare commits

15 Commits

Author SHA1 Message Date
Burgess Leo
4d7c2e995c project done but test failed 2025-05-27 13:10:13 +08:00
Burgess Leo
491685e892 optimize db_path memory 2025-05-23 18:01:42 +08:00
Burgess Leo
d4a411ce68 almost finish 2025-05-23 13:54:31 +08:00
Burgess Leo
1fb457b67d ã/b_path style 2025-05-22 17:27:02 +08:00
Burgess Leo
d2a3a7b5b5 \db_path style 2025-05-22 17:21:44 +08:00
Burgess Leo
3347abe02f finish fragment files copy 2025-05-22 13:03:09 +08:00
Burgess Leo
0c98dfecda finish copy files follow bytes sort 2025-05-22 09:16:37 +08:00
Burgess Leo
cd536a6bd3 add SaveFile 2025-05-20 18:01:19 +08:00
Burgess Leo
08a47c6d8a finish all table analyze 2025-05-20 16:26:58 +08:00
846c7f6beb 删除 src/db_ntfs_info.db 2025-05-19 17:47:30 +08:00
Burgess Leo
deaf97607e modify .gitignore 2025-05-19 17:46:49 +08:00
Burgess Leo
697b449bff xx 2025-05-19 17:33:30 +08:00
Burgess Leo
07a4ae7a74 temp restore 2025-05-19 13:25:07 +08:00
Burgess Leo
b2e14fdbe0 restore mft_analyze 2025-05-19 11:03:36 +08:00
Burgess Leo
e167ff5d9f temp store 2025-05-19 10:21:12 +08:00
23 changed files with 2318 additions and 54 deletions

3
.gitignore vendored
View File

@@ -175,4 +175,5 @@ cython_debug/
.pypirc
# Custom stuff
.idea/
.idea/
src/*.db

View File

View File

@@ -0,0 +1,34 @@
import sqlite3
def ClearTableRecordsWithReset(db_path, table_name):
"""
清空指定表的记录并重置自增ID。
:param db_path: str, SQLite 数据库路径
:param table_name: str, 表名
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
cursor.execute(f"DELETE FROM {table_name};")
cursor.execute(f"DELETE FROM sqlite_sequence WHERE name='{table_name}';")
conn.commit()
print(f"表 [{table_name}] 已清空并重置自增ID")
except sqlite3.Error as e:
print(f"❌ 操作失败: {e}")
finally:
conn.close()
if __name__ == '__main__':
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_path')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_node')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_device')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_config')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_user')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_group')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_extend_extent')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_extend_name')

View File

@@ -138,7 +138,7 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
extent4_Length INTEGER,
-- 外键约束(可选)
FOREIGN KEY(PathID) REFERENCES path_table(ID),
FOREIGN KEY(PathID) REFERENCES db_path(ID),
FOREIGN KEY(ExtendNameID) REFERENCES extname_table(ID),
FOREIGN KEY(GroupID) REFERENCES groups(ID),
FOREIGN KEY(UserID) REFERENCES users(ID)

23
fake_main.py Normal file
View File

@@ -0,0 +1,23 @@
import itertools
from files_utils.files_save import CopySingleFragmentFiles, CopyMultiFragmentFiles
from files_utils.files_sort import GetSortFragments
from files_utils.folders_sort import ClassifyFilesAndFolders, ScanMultiFolders
fragment_lists = {}
target_path = r"Z:\test_files"
mix_test_data = [
"test-copy"
]
classify_files_and_folders = ClassifyFilesAndFolders(mix_test_data)
files_list = classify_files_and_folders["files"]
folders_files_list = ScanMultiFolders(classify_files_and_folders["folders"])
merged_list = list(itertools.chain(files_list, folders_files_list))
sort_fragments = GetSortFragments(db_path="./src/db_ntfs_info.db", files_list=merged_list)
for item in sort_fragments:
extent_count = item['extent_count']
if extent_count == 1:
CopySingleFragmentFiles(item, target_path=target_path)
elif extent_count > 1:
CopyMultiFragmentFiles(item, fragment_lists=fragment_lists, target_path=target_path)

131
files_utils/files_save.py Normal file
View File

@@ -0,0 +1,131 @@
import os
def GetVolumeLetter() -> str:
from ntfs_utils.main import volume_letter
return volume_letter
def CopySingleFragmentFiles(source_data_dict, target_path):
"""
根据起始字节和长度,从磁盘中读取数据并保存为目标文件
:param source_data_dict: 包含源数据信息的字典
:param target_path: 目标文件夹路径
"""
start_byte = source_data_dict.get("start_byte")
byte_length = source_data_dict.get("length")
absolute_path = source_data_dict.get("absolute_path")
file_name = source_data_dict.get("filename")
if byte_length <= 0:
print("错误:字节长度无效")
return
if not absolute_path or not file_name:
print("错误:缺少必要的文件信息")
return
source_disk_path = GetVolumeLetter()
target_file_path = os.path.join(target_path, file_name)
try:
# 创建目标目录(如果不存在)
os.makedirs(target_path, exist_ok=True)
with open(fr"\\.\{source_disk_path}:", 'rb') as disk:
disk.seek(start_byte)
with open(target_file_path, 'wb') as f:
remaining = byte_length
CHUNK_SIZE = 1024 * 1024 # 1MB
while remaining > 0:
read_size = min(CHUNK_SIZE, remaining)
chunk = disk.read(read_size)
if not chunk:
print("警告:读取到空数据,可能已到达磁盘末尾。")
break
f.write(chunk)
remaining -= len(chunk)
print(
f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}")
except PermissionError:
print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序")
except Exception as e:
print(f"发生错误: {str(e)}")
def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes:
"""
从指定磁盘的指定起始位置读取指定长度的字节。
:param volume_letter: 盘符(如 "Y"
:param start_byte: 起始字节位置(整数)
:param length: 要读取的字节数(整数)
:return: 读取到的原始字节数据bytes
"""
if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1:
raise ValueError("drive_letter 必须是单个字母,如 'Y'")
# 构建 Windows 设备路径格式:\\.\Y:
disk_path = f"\\\\.\\{volume_letter.strip().upper()}:"
try:
with open(disk_path, "rb") as disk:
disk.seek(start_byte)
data = disk.read(length)
return data
except PermissionError:
raise PermissionError("权限不足,请以管理员身份运行程序")
except Exception as e:
raise RuntimeError(f"读取磁盘失败:{e}")
def CopyMultiFragmentFiles(
item: dict,
fragment_lists: dict,
target_path: str
):
"""
处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。
:param item: 包含文件分片信息的字典
:param fragment_lists: 存储各文件分片内容的字典
:param target_path: 恢复文件的目标保存路径
:return: None
"""
file_name = item['filename']
extent_count = item['extent_count']
fragment_index = item['fragment_index']
start_byte = item['start_byte']
length_byte = item['length']
volume_letter = GetVolumeLetter()
# 读取分片内容
fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte)
# 如果还没有为这个文件创建列表,则初始化
if file_name not in fragment_lists:
fragment_lists[file_name] = [None] * extent_count
# 将内容插入到指定位置
if fragment_index <= extent_count:
fragment_lists[file_name][fragment_index - 1] = fragment_content
print(f"已写入 {file_name} 的第 {fragment_index} 个片段。")
else:
print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}")
# 检查是否所有分片都已加载
fragments = fragment_lists[file_name]
if None not in fragments:
full_content = b''.join(fragments)
target_file_path = os.path.join(target_path, file_name)
try:
with open(target_file_path, 'wb') as f:
f.write(full_content)
print(f"已成功恢复文件:{file_name}")
except Exception as e:
print(f"写入文件失败:{file_name},错误:{e}")

148
files_utils/files_sort.py Normal file
View File

@@ -0,0 +1,148 @@
import sqlite3
def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path",
files_path=None) -> list:
"""
根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。
:param db_path: 数据库文件路径
:param table_name: 要查询的数据表名称
:param files_path: 文件的完整路径列表
:return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str}
"""
if files_path is None:
files_path = []
results = []
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
for path in files_path:
try:
# 使用字符串格式化插入表名,参数化查询只适用于值
sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?"
cursor.execute(sql, (path,))
row = cursor.fetchone()
if row:
results.append({
'absolute_path': path,
'id': row[0],
'name': row[1]
})
else:
print(f"未找到匹配记录:{path}")
except Exception as e:
print(f"查询失败:{path},错误:{e}")
conn.close()
return results
def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node",
path_records: list = None) -> list:
"""
根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。
:param db_path: 数据库文件路径
:param table_name: db_node 表名
:param path_records: 来自 get_db_path_info 的结果列表
:return: 包含文件分片信息的结果列表
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
results = []
for record in path_records:
path_id = record['id']
absolute_path = record['absolute_path']
name = record['name']
try:
# 查询 db_node 表中 PathID 对应的记录
cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,))
row = cursor.fetchone()
if not row:
print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录")
continue
# 获取字段索引(适用于按列名获取)
columns = [desc[0] for desc in cursor.description]
# 构建字典以便按列名访问
node_data = dict(zip(columns, row))
# 获取 ExtentCount
extent_count = node_data.get("ExtentCount", 0)
# 解析分片信息
fragments = []
for i in range(1, 5): # extent1 ~ extent4
loc = node_data.get(f"extent{i}_Location")
length = node_data.get(f"extent{i}_Length")
if loc is not None and length is not None and length > 0:
fragments.append({
"start_byte": loc,
"length": length
})
results.append({
"absolute_path": absolute_path,
"name": name,
"path_id": path_id,
"extent_count": extent_count,
"fragments": fragments
})
except Exception as e:
print(f"查询失败PathID={path_id}, 错误:{e}")
conn.close()
return results
def SortFragmentsByStartByte(file_extents_list: list) -> list:
"""
对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。
:param file_extents_list: get_file_extents_info 返回的结果列表
:return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息
"""
all_fragments = []
for file_info in file_extents_list:
absolute_path = file_info['absolute_path']
filename = file_info['name']
extent_count = file_info['extent_count']
fragments = file_info['fragments']
# 对当前文件的片段排序(虽然通常已经是有序的)
sorted_fragments = sorted(fragments, key=lambda x: x['start_byte'])
# 添加片段索引信息
for idx, fragment in enumerate(sorted_fragments, start=1):
all_fragments.append({
'absolute_path': absolute_path,
'filename': filename,
'extent_count': extent_count,
'start_byte': fragment['start_byte'],
'length': fragment['length'],
'fragment_index': idx
})
# 全局排序:按 start_byte 排序所有片段
all_fragments.sort(key=lambda x: x['start_byte'])
return all_fragments
def GetSortFragments(db_path: str = "../src/db_ntfs_info.db", files_list: list = None) -> list:
path_info = GetFilesDBPathInfo(db_path=db_path, table_name="db_path", files_path=files_list)
node_info = GetFilesDBNodeInfo(db_path=db_path, table_name="db_node", path_records=path_info)
result = SortFragmentsByStartByte(node_info)
return result

View File

@@ -0,0 +1,14 @@
import subprocess
source_path = r"Y:\\test-copy"
target_path = r"Z:\\test-copy"
subprocess.run([
"robocopy",
source_path,
target_path,
"/E", # 包括子目录
"/R:3", # 重试次数
"/W:1", # 重试等待时间
"/MT:16" # 多线程16线程
])

263
files_utils/folders_sort.py Normal file
View File

@@ -0,0 +1,263 @@
import os
import sqlite3
from files_utils.files_sort import GetFilesDBNodeInfo, SortFragmentsByStartByte
from ntfs_utils.main import volume_letter
def GetFolderID(
folder_path: str,
db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path"
) -> int | None:
"""
根据文件夹路径,查询数据库中该文件夹对应的 ID。
:param folder_path: 文件夹路径(如 r"CloudMusic\\"
:param db_path: 数据库文件路径
:param table_name: 要查询的数据表名称,默认为 'db_path'
:return: 成功则返回 IDint失败返回 None
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 使用 table_name 构建 SQL 查询
sql = f"SELECT ID FROM {table_name} WHERE Path = ?"
cursor.execute(sql, (folder_path,))
result = cursor.fetchone()
if result:
return result[0]
else:
print(f"未找到路径:{folder_path} 在表 {table_name}")
return None
except sqlite3.Error as e:
print(f"数据库操作失败:{e}")
return None
finally:
conn.close()
def GetSubPathsByParentID(
parent_id: int,
db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path"
) -> list:
"""
根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。
:param parent_id: 父节点 ID
:param db_path: 数据库文件路径
:param table_name: 数据表名称
:return: 包含 ID、Path、Name 的字典列表
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
sql = f"""
SELECT ID, Path, Name
FROM {table_name}
WHERE ParentID = ?
"""
try:
cursor.execute(sql, (parent_id,))
rows = cursor.fetchall()
except Exception as e:
print(f"数据库查询失败:{e}")
return []
results = []
for row in rows:
item = {
'id': row[0],
'absolute_path': row[1],
'name': row[2]
}
results.append(item)
conn.close()
return results
def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list:
"""
根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。
:param db_path: 要查询的数据库
:param folder_path: 文件夹的绝对路径
:return list: 文件夹下所有文件按片段顺序排列的列表
"""
parent_id = GetFolderID(folder_path=folder_path, db_path=db_path)
path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path)
node_data = GetFilesDBNodeInfo(path_records=path_data)
result = SortFragmentsByStartByte(node_data)
return result
# if __name__ == "__main__":
# folder_path_test = "pictures/"
# data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test)
# for item in data:
# print(item)
def ScanDirectory(root_dir, skip_system=True):
"""
递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。
:param root_dir: 要扫描的根目录路径
:param skip_system: 是否跳过系统目录(默认 True
:return: 文件路径列表,格式为 relative/path/to/file.ext
"""
file_list = []
for root, dirs, files in os.walk(root_dir):
# 跳过系统目录
if skip_system:
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
for file in files:
full_path = os.path.join(root, file)
# 去掉盘符
_, relative_path = os.path.splitdrive(full_path)
# 替换 \ 为 /
relative_path = relative_path.lstrip("\\").replace("\\", "/")
file_list.append(relative_path)
return file_list
# if __name__ == "__main__":
# folder_path = r"Y:/folder1/"
# files_list = ScanDirectory(folder_path)
#
# print(f"共找到 {len(files_list)} 个文件:")
# for f in files_list:
# print(f)
def ScanMultiFolders(folder_paths_list: list, skip_system: bool = True) -> list:
"""
扫描多个根目录,返回所有文件的相对路径列表(格式为 folder/file.ext
:param folder_paths_list: 要扫描的根目录列表(如 ["CloudMusic/", "folder1/"]
:param skip_system: 是否跳过系统目录
:return: 文件路径列表(统一格式为 folder/file.ext
"""
all_files = []
for root_dir in folder_paths_list:
# 规范化输入路径,确保结尾有 '/'(如果是目录)
normalized_root_dir = root_dir.replace("\\", "/")
if not normalized_root_dir.endswith("/"):
normalized_root_dir += "/" # 确保结尾 /
full_root_path = f"{volume_letter}:/{normalized_root_dir}"
full_root_path = os.path.normpath(full_root_path)
if not os.path.exists(full_root_path):
print(f"⚠️ 路径不存在:{full_root_path}")
continue
for root, dirs, files in os.walk(full_root_path):
if skip_system:
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
for file in files:
full_file_path = os.path.join(root, file)
# 去掉盘符
_, relative_path = os.path.splitdrive(full_file_path)
# 去除开头和结尾的 '\' 或 '/' 并替换分隔符
normalized_path = relative_path.strip("\\").strip("/").replace("\\", "/")
all_files.append(normalized_path)
return all_files
# if __name__ == "__main__":
# folders = [
# "CloudMusic\\",
# "folder1/"
# ]
#
# files = ScanMultiFolders(folders)
#
# print(f"共找到 {len(files)} 个文件:")
# for f in files:
# print(f)
def ClassifyFilesAndFolders(paths: list) -> dict:
"""
将传入的路径列表分类为文件和目录,并统一使用 '/' 分隔符。
确保目录路径以 '/' 结尾。
:param paths: 路径列表(元素可以是文件或目录)
:return: 包含 'files''directories' 的字典,路径格式统一为 '/'
"""
files = []
directories = []
for path in paths:
# 统一用 '/' 分隔符,并保留原始结构(是否以 '/' 结尾)
normalized_path = path.replace("\\", "/")
# 判断是否原本是目录(以 '/' 或 '\' 结尾)
is_potential_dir = normalized_path.endswith("/")
# 拼接完整路径用于判断是否存在
full_path = f"{volume_letter}:/{normalized_path.lstrip('/')}"
full_path = os.path.normpath(full_path)
if os.path.isfile(full_path):
# 如果是文件,去掉结尾的 /(如果有的话)
if normalized_path.endswith("/"):
normalized_path = normalized_path.rstrip("/")
files.append(normalized_path)
elif os.path.isdir(full_path):
# 如果是目录,确保以 '/' 结尾
if not normalized_path.endswith("/"):
normalized_path += "/"
directories.append(normalized_path)
else:
print(f"⚠️ 路径不存在或类型未知:{normalized_path}")
return {
'files': files,
'folders': directories
}
# if __name__ == "__main__":
# test_paths = [
# "CloudMusic\\AGA - MIZU.mp3",
# "CloudMusic/AGA - 一.mp3",
# "CloudMusic/Aaron Zigman - Main Title.mp3",
# "CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
# "CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
# "CloudMusic/Ava Max - Sweet but Psycho.mp3",
# "CloudMusic\\",
# "folder1/",
# "CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
# "CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
# ]
#
# result = ClassifyFilesAndFolders(test_paths)
#
# print("✅ 文件列表:")
# for f in result['files']:
# print(f)
#
# print("\n📁 文件夹列表:")
# for d in result['directories']:
# print(d)

View File

View File

@@ -8,9 +8,10 @@ def GetNTFSBootInfo(volume_letter):
- Bytes per sector
- Sectors per cluster
- Cluster size (bytes)
- $MFT 起始簇号
参数:
volume_letter: 卷标字符串,例如 'C'
volume_letter: str卷标字符串,例如 'C'
返回:
dict 包含上述信息
@@ -62,10 +63,15 @@ def GetNTFSBootInfo(volume_letter):
# 计算簇大小
cluster_size = bytes_per_sector * sectors_per_cluster
# 解析 $MFT 起始簇号LCN偏移 0x30QWORD8 字节)
mft_lcn_bytes = buffer[0x30:0x38]
mft_lcn = int.from_bytes(mft_lcn_bytes, byteorder='little', signed=False)
return {
"BytesPerSector": bytes_per_sector,
"SectorsPerCluster": sectors_per_cluster,
"ClusterSize": cluster_size
"ClusterSize": cluster_size,
"MftPosition": mft_lcn
}

266
ntfs_utils/db_node.py Normal file
View File

@@ -0,0 +1,266 @@
import hashlib
import os
import sqlite3
from datetime import datetime
from ntfs_utils.mft_analyze import GetFile80hPattern, GetFragmentData, ExtractSequenceHexValues, hex_list_to_int
from ntfs_utils.main import volume_letter
# 工具函数:获取文件扩展名
def GetFileExtension(name: str) -> str:
parts = name.rsplit('.', 1)
return parts[1].lower() if len(parts) > 1 else ""
# 获取 ExtendNameID基于文件名后缀
def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int:
ext = GetFileExtension(name)
if not ext:
return 0
cursor.execute("SELECT ID FROM db_extend_name WHERE ExtendName = ?", (ext,))
result = cursor.fetchone()
return result[0] if result else 0
# 获取 DirLayer路径层级
def GetDirLayer(path: str) -> int:
path = path.strip()
if not path or path == "\\":
return 0
return path.count("\\") - 1
# 获取 GroupID默认第一个
def GetFirstGroupId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_group ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# 获取 UserID默认第一个
def GetFirstUserId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_user ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
def GetFilesTime(file_path):
"""
获取指定文件的创建时间、修改时间、访问时间和权限变更时间。
st_atime: 最后一次访问时间FileAccessTime
st_mtime: 最后一次修改内容的时间FileModifyTime
st_ctime: 文件元数据metadata更改时间在 Windows 中是文件创建时间FileCreateTime
参数:
file_path (str): 文件的绝对路径
返回:
dict: 包含 FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime 的字符串格式,
如果无法获取则返回 "default"
"""
if not os.path.exists(file_path):
return {
"FileCreateTime": "default",
"FileModifyTime": "default",
"FileAccessTime": "default",
"FileAuthTime": "default"
}
try:
stat_info = os.stat(file_path)
def ts_to_str(timestamp):
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
create_time = ts_to_str(stat_info.st_ctime)
modify_time = ts_to_str(stat_info.st_mtime)
access_time = ts_to_str(stat_info.st_atime)
# 权限变更时间Windows 下可能不适用
try:
auth_time = ts_to_str(getattr(stat_info, 'st_birthtime', stat_info.st_ctime))
except Exception:
auth_time = "default"
return {
"FileCreateTime": create_time,
"FileModifyTime": modify_time,
"FileAccessTime": access_time,
"FileAuthTime": auth_time
}
except Exception as e:
print(f"❌ 获取文件时间失败: {e}")
return {
"FileCreateTime": "default",
"FileModifyTime": "default",
"FileAccessTime": "default",
"FileAuthTime": "default"
}
# 获取设备IDdb_device第一条记录
def GetDeviceId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_device ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# 获取文件大小(伪数据)
def GetFileSize(file80h_pattern):
if not file80h_pattern or not isinstance(file80h_pattern, list):
return 0
if file80h_pattern[0].get('is_resident'):
fragments = GetFragmentData(file80h_pattern)
if fragments and len(fragments) > 0:
return fragments[0].get('byte_length', 0)
else:
sequence_list = ExtractSequenceHexValues(file80h_pattern)
if len(sequence_list) < 64:
raise ValueError("序列长度不足,无法解析文件大小")
size_list = sequence_list[56:64]
size = hex_list_to_int(size_list)
return size
# 获取文件内容哈希(伪数据)
def GetFileHash(full_path: str) -> str:
return hashlib.sha256(full_path.encode()).hexdigest()
# 新增:获取文件片段位置和长度
def GetFragmentLocation(fragment):
return fragment.get('starting_byte', 0)
def GetFragmentLength(fragment):
return fragment.get('byte_length', 0)
# 主函数:将 db_path 数据导入 db_node
def InsertNodeDataToDB(db_path='../src/db_ntfs_info.db', table_name='db_node', batch_size=20):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
if len(volume_letter) == 1:
volume_root = f"{volume_letter}:\\"
elif volume_letter.endswith(':'):
volume_root = f"{volume_letter}\\"
else:
volume_root = f"{volume_letter}:\\" # 支持 "Y" 或 "Y:" 输入
print(f"🔍 当前处理磁盘根目录:{volume_root}")
group_id = GetFirstGroupId(cursor)
user_id = GetFirstUserId(cursor)
device_id = GetDeviceId(cursor)
cursor.execute("SELECT ID, Path, Name, ParentID FROM db_path")
rows = cursor.fetchall()
insert_fields = [
'PathID', 'ParentID', 'NameHash', 'PathHash',
'ExtendNameID', 'DirLayer', 'GroupID', 'UserID',
'FileCreateTime', 'FileModifyTime', 'FileAccessTime', 'FileAuthTime',
'FileSize', 'FileMode', 'FileHash', 'ExtentCount',
# extent 字段
"extent1_DeviceID", "extent1_Location", "extent1_Length",
"extent2_DeviceID", "extent2_Location", "extent2_Length",
"extent3_DeviceID", "extent3_Location", "extent3_Length",
"extent4_DeviceID", "extent4_Location", "extent4_Length"
]
insert_placeholders = ', '.join('?' * len(insert_fields))
insert_sql = f"INSERT INTO {table_name} ({', '.join(insert_fields)}) VALUES ({insert_placeholders})"
batch = []
for row in rows:
path_id, relative_path, name, parent_id = row
full_path = os.path.join(volume_root, relative_path)
# 检查是否已存在相同 PathID
cursor.execute("SELECT COUNT(*) FROM db_node WHERE PathID = ?", (path_id,))
exists = cursor.fetchone()[0]
if exists > 0:
print(f"⚠️ PathID {path_id} 已存在,跳过插入")
continue
try:
file80h_pattern = GetFile80hPattern(full_path)
fragments = GetFragmentData(file80h_pattern)
extent_count = min(len(fragments), 4)
except Exception as e:
print(f"⚠️ 获取 ExtentCount 失败,使用默认值 0: {e}")
fragments = []
extent_count = 0
# 计算字段
name_hash = hashlib.sha256(name.encode()).hexdigest()
dir_layer = GetDirLayer(relative_path)
extend_name_id = GetExtendNameId(name, cursor)
try:
file_size = GetFileSize(file80h_pattern)
except Exception as e:
print(f"⚠️ 获取文件大小失败,使用默认值 0: {e}")
file_size = 0
file_hash = GetFileHash(full_path)
# 获取时间信息
file_times = GetFilesTime(full_path)
create_time = file_times["FileCreateTime"]
modify_time = file_times["FileModifyTime"]
access_time = file_times["FileAccessTime"]
auth_time = file_times["FileAuthTime"]
# 查询 PathHash
cursor.execute("SELECT PathHash FROM db_path WHERE ID = ?", (path_id,))
path_hash_result = cursor.fetchone()
path_hash = path_hash_result[0] if path_hash_result else ""
# 构建 extent 字段
extent_data = []
for i in range(4): # 最多4个 extent
if i < len(fragments):
frag = fragments[i]
location = GetFragmentLocation(frag)
length = GetFragmentLength(frag)
extent_data.extend([device_id, location, length])
else:
extent_data.extend([None, None, None])
# 构建插入数据
values = [
path_id, parent_id, name_hash, path_hash,
extend_name_id, dir_layer, group_id, user_id,
create_time, modify_time, access_time, auth_time,
file_size, 'default', file_hash, extent_count,
*extent_data
]
batch.append(values)
# 批量插入
if len(batch) >= batch_size:
cursor.executemany(insert_sql, batch)
conn.commit()
print(f"✅ 提交一批 {len(batch)} 条记录到 {table_name}")
batch.clear()
# 插入剩余不足一批的数据
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
print(f"✅ 提交最后一批 {len(batch)} 条记录到 {table_name}")
conn.close()
print(f"✅ 数据已成功插入到 {table_name}")
if __name__ == "__main__":
InsertNodeDataToDB()

View File

@@ -23,27 +23,26 @@ def ShouldSkipPath(path: str) -> bool:
return False
def ScanVolume(volume_letter: str):
def ScanVolume(volume_letter: str) -> list:
"""
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
并为每个节点分配 ParentID。
返回:
list of dict包含文件/目录信息的字典列表
"""
root_path = f"{volume_letter.upper()}:\\"
if not os.path.exists(root_path):
raise ValueError(f"磁盘 {root_path} 不存在")
result = []
path_to_id = {} # 用于记录路径到数据库 ID 的映射
counter = 1 # 模拟数据库自增 ID
path_to_id = {} # 路径 -> ID 映射
counter = 1
for root, dirs, files in os.walk(root_path, topdown=True, onerror=None, followlinks=False):
# 过滤掉需要跳过的目录
dirs[:] = [d for d in dirs if not ShouldSkipPath(os.path.join(root, d))]
for entry in files + dirs:
entries = files + dirs
for entry in entries:
full_path = os.path.join(root, entry)
if ShouldSkipPath(full_path):
@@ -61,21 +60,32 @@ def ScanVolume(volume_letter: str):
name = entry
# ✅ 修正点:对 Path 字段进行哈希
path_hash = GenerateHash(full_path)
# 分离盘符并处理路径格式
_, relative_path = os.path.splitdrive(full_path)
relative_path = relative_path.lstrip("\\").rstrip("\\")
if os.path.isdir(full_path) and not relative_path.endswith("/"):
relative_path += "/"
relative_path = relative_path.replace("\\", "/")
path_hash = GenerateHash(relative_path)
# 计算 ContentSizeKB小文件至少显示为 1 KB
content_size = bytes_size // 1024
if content_size == 0 and bytes_size > 0:
content_size = 1
# 获取父目录路径
parent_path = os.path.dirname(full_path)
parent_id = path_to_id.get(parent_path, 0) # 默认为 0根目录可能未录入
_, parent_relative_path = os.path.splitdrive(parent_path)
parent_relative_path = parent_relative_path.lstrip("\\").rstrip("\\")
if os.path.isdir(parent_path) and not parent_relative_path.endswith("/"):
parent_relative_path += "/"
parent_relative_path = parent_relative_path.replace("\\", "/")
parent_id = path_to_id.get(parent_relative_path, 0)
item = {
"ID": counter,
"Path": full_path,
"Path": relative_path,
"Name": name,
"PathHash": path_hash,
"IsDir": is_dir,
@@ -83,25 +93,27 @@ def ScanVolume(volume_letter: str):
"ContentSize": content_size
}
result.append(item)
path_to_id[full_path] = counter
yield item # 使用 yield 返回每条记录
path_to_id[relative_path] = counter
counter += 1
except Exception as e:
print(f"⚠️ 跳过路径 {full_path},错误: {e}")
return result
def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
def InsertPathDataToDB(data_generator, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
"""
批量将扫描结果写入数据库。
流式写入数据库,边扫描边入库。
:param data_generator: 可迭代对象(如生成器)
:param db_path: 数据库路径
:param table_name: 表名
:param batch_size: 每多少条记录提交一次
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 创建表(如果不存在)
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -111,23 +123,20 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
ParentID INTEGER,
ContentSize INTEGER,
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
);
"""
cursor.execute(create_table_sql)
# 插入语句(忽略重复 PathHash
insert_sql = f"""
INSERT OR IGNORE INTO {table_name}
(Path, Name, PathHash, IsDir, ParentID, ContentSize)
VALUES (?, ?, ?, ?, ?, ?)
"""
total_inserted = 0
batch = []
for item in data:
for item in data_generator:
batch.append((
item['Path'],
item['Name'],
@@ -140,39 +149,34 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
if len(batch) >= batch_size:
cursor.executemany(insert_sql, batch)
conn.commit()
total_inserted += cursor.rowcount
print(f"✅ 提交一批 {len(batch)} 条数据")
batch.clear()
# 插入剩余数据
# 提交剩余不足一批的数据
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
total_inserted += cursor.rowcount
print(f"✅ 提交最后一批 {len(batch)} 条数据")
print(f"✅ 总共插入 {total_inserted} 条记录到数据库。")
except Exception as e:
print(f"❌ 插入失败: {e}")
conn.rollback()
finally:
conn.close()
# 示例主函数
def main():
volume_letter = "Z"
def DBPathMain(volume_letter: str):
print(f"🔍 开始全盘扫描磁盘 {volume_letter}:\\ ...")
scanned_data = ScanVolume(volume_letter)
print(f"📊 共扫描到 {len(scanned_data)} 条有效记录,开始入库...")
InsertPathDataToDB(scanned_data)
# 获取生成器对象
generator = ScanVolume(volume_letter)
print(f"📊 开始逐批入库...")
InsertPathDataToDB(generator)
print("✅ 全盘扫描与入库完成")
if __name__ == "__main__":
main()
DBPathMain(volume_letter="Y")

View File

@@ -1,14 +1,15 @@
from db_config import GetNTFSBootInfo, InsertInfoToDBConfig
from db_device import ScanSpecialVolumes, InsertVolumesToDB
from db_extend_name import InsertExtensionsToDB
from db_group import InsertGroupToDB
from db_path import GenerateHash, ShouldSkipPath, ScanVolume, InsertPathDataToDB
from db_user import InsertUserToDB
from ntfs_utils.db_config import GetNTFSBootInfo, InsertInfoToDBConfig
from ntfs_utils.db_device import ScanSpecialVolumes, InsertVolumesToDB
from ntfs_utils.db_extend_name import InsertExtensionsToDB
from ntfs_utils.db_group import InsertGroupToDB
# from ntfs_utils.db_node import InsertNodeDataToDB
from ntfs_utils.db_path import DBPathMain
from ntfs_utils.db_user import InsertUserToDB
volume_letter = 'Y'
def main():
volume_letter = 'Z'
# 初始化 db_config 表
config_data = GetNTFSBootInfo(volume_letter)
InsertInfoToDBConfig(config_data)
@@ -25,10 +26,6 @@ def main():
group_name_list = ["Copier"]
InsertGroupToDB(group_name_list)
# 初始化 db_path 表
scanned_data = ScanVolume(volume_letter)
InsertPathDataToDB(scanned_data)
# 初始化 db_extend_name 表
common_extensions = [
"txt", "log", "csv", "xls", "xlsx", "doc", "docx",
@@ -40,6 +37,12 @@ def main():
count = InsertExtensionsToDB(common_extensions)
print(f"共插入 {count} 个新扩展名。")
# 初始化 db_path 表
DBPathMain(volume_letter=volume_letter)
# 初始化 db_node 表
# InsertNodeDataToDB(volume_letter)
if __name__ == '__main__':
main()

408
ntfs_utils/mft_analyze.py Normal file
View File

@@ -0,0 +1,408 @@
import os
import pytsk3
from ntfs_utils.db_config import GetNTFSBootInfo
def find_file_mft_entry(fs, target_path):
"""
在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号
"""
def traverse_directory(inode, path_components):
if not path_components:
return inode
dir_name = path_components[0].lower()
try:
directory = fs.open_dir(inode=inode)
except Exception as e:
print(f"Error opening directory with inode {inode}: {e}")
return None
for entry in directory:
if not entry.info or not entry.info.name or not entry.info.meta:
continue
name = entry.info.name.name.decode('utf-8', errors='ignore').lower()
meta = entry.info.meta
# 匹配当前层级目录或文件名
if name == dir_name:
if len(path_components) == 1:
# 是目标文件/目录
return meta.addr
elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR:
# 继续深入查找子目录
next_inode = entry.info.meta.addr
result = traverse_directory(next_inode, path_components[1:])
if result:
return result
return None
# 拆分路径
path_parts = target_path.strip("\\").lower().split("\\")
root_inode = fs.info.root_inum # 根目录 MFT Entry
return traverse_directory(root_inode, path_parts)
def GetFileMftEntry(file_path):
"""
获取指定文件在 NTFS 中的 MFT Entry 编号
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# 获取驱动器字母
drive_letter = os.path.splitdrive(file_path)[0][0]
device = f"\\\\.\\{drive_letter}:"
# print(f"Opening device: {device}")
try:
img = pytsk3.Img_Info(device)
fs = pytsk3.FS_Info(img)
except Exception as e:
raise RuntimeError(f"Failed to open device '{device}': {e}")
# 构建相对路径
abs_path = os.path.abspath(file_path)
root_path = f"{drive_letter}:\\"
rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\")
# print(f"Looking up MFT entry for: {rel_path}")
mft_entry = find_file_mft_entry(fs, rel_path)
# print(f"MFT Entry: {mft_entry}")
if mft_entry is None:
raise RuntimeError("Could not find MFT entry for the specified file.")
return mft_entry
def CalculateFileMftStartSector(mft_entry, volume_letter="Z"):
"""
根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号
参数:
mft_entry (int): 文件的 MFT Entry 编号(即 inode
mft_start_sector (int): $MFT 的起始扇区号,默认 6291456
mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024
bytes_per_sector (int): 每扇区字节数,默认 512
返回:
int: 文件 MFT Entry 的起始扇区号
"""
if mft_entry < 0:
raise ValueError("MFT Entry 编号不能为负数")
# 获取 NTFS 引导信息
config_data = GetNTFSBootInfo(volume_letter)
# 计算文件 MFT Entry 的起始扇区号
start_sector = config_data["MftPosition"] * 8 + mft_entry * 2
if start_sector < 0:
raise ValueError("起始扇区号不能为负数")
# print(f"文件 MFT Entry 的起始扇区号: {start_sector}")
return start_sector
def Get80hPattern(sector_number, volume_letter="Z"):
"""
读取NTFS扇区并查找特定模式的数据
参数:
sector_number (int): 要读取的扇区号
drive_path (str): 磁盘设备路径默认为Z盘
返回:
list: 包含所有匹配信息的列表,每个元素为:
{
'start_byte': 文件MFT Entry的起始字节位置StartSector * 512,
'offset': 当前80属性在扇区内的偏移位置,
'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ...",
'is_resident': 是否为常驻属性,
'total_groups': 实际读取的组数,
'attribute_length': 属性总长度(字节)
}
"""
drive_path = fr"\\.\{volume_letter}:"
SECTOR_SIZE = 512
GROUP_SIZE = 8 # 每组8字节
MATCH_BYTE = 0x80 # 要匹配的起始字节
results = []
try:
with open(drive_path, 'rb') as disk:
disk.seek(sector_number * SECTOR_SIZE)
sector_data = disk.read(SECTOR_SIZE)
if not sector_data or len(sector_data) < GROUP_SIZE:
print(f"错误: 无法读取扇区 {sector_number}")
return results
groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)]
for i in range(len(groups)):
current_group = groups[i]
if len(current_group) < GROUP_SIZE:
continue
if current_group[0] == MATCH_BYTE:
# 获取第5~8字节作为属性长度小端DWORD
if i + 1 >= len(groups):
print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h")
continue
attribute_length_bytes = b''.join([
groups[i][4:8], # 第一组的4~7字节
groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00'
])
attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little')
# 计算要读取的组数向上取整到8字节
total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE
end_idx = i + total_groups
if end_idx > len(groups):
print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h")
continue
raw_sequence = groups[i:end_idx]
# 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00"
formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence]
# 判断是否为常驻属性查看第2个组第一个字节最低位
is_resident = False
if len(raw_sequence) >= 2:
second_group = raw_sequence[1]
is_resident = (second_group[0] & 0x01) == 0x00
result_entry = {
'start_byte': sector_number * SECTOR_SIZE, # 新增字段文件MFT Entry的起始字节位置
'offset': i * GROUP_SIZE,
'sequence': formatted_sequence,
'is_resident': is_resident,
'total_groups': total_groups,
'attribute_length': attribute_length
}
results.append(result_entry)
# resident_str = "常驻" if is_resident else "非常驻"
# print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:")
# print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:")
# for j, group in enumerate(formatted_sequence):
# print(f"组 {j + 1}: {group}")
#
# print(f"\n共找到 {len(results)} 个匹配序列")
return results
except PermissionError:
print("错误: 需要管理员权限访问磁盘设备")
except Exception as e:
print(f"发生错误: {str(e)}")
return results
def GetFile80hPattern(file_path):
volume_letter = file_path.split(':')[0]
try:
mft_entry_value = GetFileMftEntry(file_path)
StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter)
# print(f"文件的相关信息以及80属性内容")
# print(Get80hPattern(StartSector, volume_letter))
file80h_pattern = Get80hPattern(StartSector, volume_letter)
return file80h_pattern
except Exception as e:
print(f"❌ Error: {e}")
return None
# if __name__ == '__main__':
# data = GetFile80hPattern(r"Z:\hello.txt")
# print(data)
def ExtractSequenceHexValues(file80h_pattern):
"""
从给定的数据结构中提取所有 sequence 的十六进制字符串,并合并成一个标准列表
参数:
data (list): 包含字典的列表,每个字典有 'sequence'
返回:
list: 包含所有 sequence 值的合并列表
"""
sequence_list = []
for entry in file80h_pattern:
if 'sequence' in entry:
# 将每个十六进制字符串按空格分割,然后合并到结果列表
for hex_str in entry['sequence']:
# 分割字符串并添加到结果
sequence_list.extend(hex_str.split())
return sequence_list
def ExportDataRunList(data_run_list):
"""
将 data_run_list 拆分成多个独立的 Data Run 片段。
"""
result = []
pos = 0
while pos < len(data_run_list):
current_byte = data_run_list[pos]
if current_byte == '00':
break
try:
header = int(current_byte, 16)
len_bytes = (header >> 4) & 0x0F
offset_bytes = header & 0x0F
run_length = 1 + offset_bytes + len_bytes
if pos + run_length > len(data_run_list):
print(f"⚠️ 数据越界,停止解析")
break
fragment = data_run_list[pos: pos + run_length]
result.append(fragment)
pos += run_length
except Exception as e:
print(f"❌ 解析 Data Run 失败:位置 {pos}, 错误: {e}")
pos += 1 # 跳过一个字节继续解析
return result
def hex_list_to_int(lst, byteorder='little'):
"""
将十六进制字符串列表转换为整数(支持小端序)
"""
if byteorder == 'little':
lst = list(reversed(lst))
return int(''.join(f"{int(b, 16):02x}" for b in lst), 16)
def parse_data_run(data_run, previous_cluster=0, cluster_size=512):
"""
解析 NTFS 单个 Data Run返回起始字节、结束字节、长度字节
参数:
data_run (list): Data Run 的十六进制字符串列表
previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移)
cluster_size (int): 簇大小(默认为 512 字节)
返回:
dict: 包含起始字节、结束字节、长度等信息
"""
if not data_run or data_run[0] == '00':
return None
header = int(data_run[0], 16)
len_bytes = (header >> 4) & 0x0F
offset_bytes = header & 0x0F
if len(data_run) < 1 + offset_bytes + len_bytes:
print(f"⚠️ 数据长度不足,无法解析 Data Run")
return None
# 提取偏移字段和长度字段
offset_data = data_run[1:1 + offset_bytes]
length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes]
# 小端序转整数
def hex_list_to_int(lst):
return int(''.join(f"{int(b, 16):02x}" for b in reversed(lst)), 16)
offset = hex_list_to_int(offset_data)
run_length = hex_list_to_int(length_data)
# 计算起始簇号
starting_cluster = previous_cluster + offset
ending_cluster = starting_cluster + run_length - 1
# 转换为字节偏移
cluster_per_sector = 8
byte_per_sector = cluster_size
byte_length = starting_cluster * cluster_per_sector * byte_per_sector
starting_byte = run_length * cluster_per_sector * byte_per_sector
ending_byte = starting_byte + byte_length - 1
return {
"starting_byte": starting_byte,
"ending_byte": ending_byte,
"byte_length": byte_length,
"starting_cluster": starting_cluster,
"run_length_clusters": run_length
}
def ParseMultipleDataRuns(fragments, cluster_size=512):
"""
批量解析多个 Data Run 片段,返回字节偏移信息。
参数:
fragments (list): 多个 Data Run 字符串列表
cluster_size (int): 簇大小(默认为 512
返回:
list: 每个元素是一个包含字节偏移信息的 dict
"""
results = []
previous_starting_cluster = 0
for fragment in fragments:
result = parse_data_run(fragment, previous_starting_cluster, cluster_size)
if result:
results.append(result)
previous_starting_cluster = result["starting_cluster"]
return results
def GetFragmentData(file80h_pattern):
if not file80h_pattern or not isinstance(file80h_pattern, list):
return []
if file80h_pattern[0].get('is_resident'):
start_byte = file80h_pattern[0].get('start_byte')
offset = file80h_pattern[0].get('offset')
content_start = file80h_pattern[0].get('sequence')[2]
content_start_list = content_start.split()
content_len = content_start_list[::-1][4:8]
content_offset = content_start_list[::-1][:4]
content_len_str = ''.join(content_len)
content_len_decimal_value = int(content_len_str, 16)
content_offset_str = ''.join(content_offset)
content_offset_decimal_value = int(content_offset_str, 16)
file_offset = start_byte + offset + content_offset_decimal_value
return [{
'starting_byte': file_offset,
'byte_length': content_len_decimal_value
}]
else:
sequence_list = ExtractSequenceHexValues(file80h_pattern)
data_run_offset = sequence_list[32:34][::-1]
data_run_offset_str = ''.join(data_run_offset)
data_run_offset_decimal_value = int(data_run_offset_str, 16)
data_run_list = sequence_list[data_run_offset_decimal_value:]
fragments = ExportDataRunList(data_run_list)
results = ParseMultipleDataRuns(fragments)
return results
# if __name__ == '__main__':
# arri80_data = GetFile80hPattern(r"Z:\hello.txt")
# data = GetFragmentData(arri80_data)
# print(data)

Binary file not shown.

View File

@@ -0,0 +1,139 @@
def extract_data_run_fragments(data_run):
"""
将 data_run 中的多个 Data Run 提取为独立的 list 片段。
参数:
data_run (list): 十六进制字符串组成的列表,表示 Data Run 内容
返回:
list: 每个元素是一个代表单个 Data Run 的 list
"""
result = []
pos = 0
while pos < len(data_run):
current_byte = data_run[pos]
if current_byte == '00':
# 遇到空运行块,停止解析
break
try:
header = int(current_byte, 16)
len_bytes = (header >> 4) & 0x0F
offset_bytes = header & 0x0F
if len_bytes == 0 or offset_bytes == 0:
print(f"⚠️ 无效的字段长度,跳过位置 {pos}")
break
# 计算当前 Data Run 总长度
run_length = 1 + offset_bytes + len_bytes
# 截取当前 Data Run
fragment = data_run[pos: pos + run_length]
result.append(fragment)
# 移动指针
pos += run_length
except Exception as e:
print(f"❌ 解析失败,位置 {pos}{e}")
break
return result
def hex_list_to_int(lst, byteorder='little'):
"""
将十六进制字符串列表转换为整数(支持小端序)
"""
if byteorder == 'little':
lst = list(reversed(lst))
return int(''.join(f"{int(b, 16):02x}" for b in lst), 16)
def parse_data_run(data_run, previous_cluster=0):
"""
解析 NTFS 单个 Data Run返回起始簇号和结束簇号
参数:
data_run (list): Data Run 的十六进制字符串列表
previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移)
返回:
dict: 包含起始簇、结束簇、运行长度等信息
"""
if not data_run or data_run[0] == '00':
return None
header = int(data_run[0], 16)
len_bytes = (header >> 4) & 0x0F
offset_bytes = header & 0x0F
# 提取偏移字段和长度字段(注意顺序是先偏移后长度)
offset_data = data_run[1:1 + offset_bytes]
length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes]
# 解析偏移和长度
offset = hex_list_to_int(offset_data, 'little')
run_length = hex_list_to_int(length_data, 'little')
# 计算起始簇号(如果是第一个就是绝对偏移,否则是相对偏移)
starting_cluster = previous_cluster + offset
ending_cluster = starting_cluster + run_length - 1
return {
"starting_cluster": starting_cluster,
"ending_cluster": ending_cluster,
"run_length": run_length
}
def parse_multiple_data_runs(fragments):
"""
批量解析多个 Data Run 片段,支持相对偏移。
参数:
fragments (list): 多个 Data Run 字符串列表,如:
[
['31', '7a', '00', 'ee', '0b'],
['22', '29', '06', 'bb', '00'],
...
]
返回:
list: 每个元素是一个 dict包含该片段的解析结果
"""
results = []
previous_starting_cluster = 0
for fragment in fragments:
result = parse_data_run(fragment, previous_starting_cluster)
if result:
results.append(result)
previous_starting_cluster = result["starting_cluster"]
return results
data_run = [
'31', '7a', '00', 'ee', '0b',
'22', '29', '06', 'bb', '00',
'32', '7a', '02', 'ee', '00', '00',
'00', 'a0', 'f8', 'ff', 'ff', 'ff', 'ff', 'ff'
]
# Step 1: 提取所有有效片段
fragments = extract_data_run_fragments(data_run)
print("提取到的片段:")
for i, frag in enumerate(fragments):
print(f"片段{i + 1}: {frag}")
# Step 2: 批量解析这些片段
results = parse_multiple_data_runs(fragments)
print("\n解析结果:")
for i, res in enumerate(results):
print(f"片段{i + 1}: {res}")

36
test/fake_main.py Normal file
View File

@@ -0,0 +1,36 @@
from files_save import CopyMultiFragmentFiles, CopySingleFragmentFiles
target_path = r"Z:\Recovered"
# 存储各个文件的分片内容列表
fragment_lists = {}
test_file_sort = [{'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'filename': 'AGA - 一.mp3', 'extent_count': 2,
'start_byte': 23162880, 'length': 69632, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3',
'filename': 'Aaron Zigman - Main Title.mp3', 'extent_count': 1, 'start_byte': 687685632,
'length': 7163904, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\AGA - MIZU.mp3', 'filename': 'AGA - MIZU.mp3', 'extent_count': 1,
'start_byte': 694849536, 'length': 8126464, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'filename': 'AGA - 一.mp3', 'extent_count': 2,
'start_byte': 702976000, 'length': 10870784, 'fragment_index': 2},
{'absolute_path': 'Y:\\CloudMusic\\Anson Seabra - Keep Your Head Up Princess.mp3',
'filename': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'extent_count': 1,
'start_byte': 713846784, 'length': 7970816, 'fragment_index': 1}, {
'absolute_path': 'Y:\\CloudMusic\\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
'filename': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
'extent_count': 1, 'start_byte': 721817600, 'length': 9179136, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\Ava Max - Sweet but Psycho.mp3',
'filename': 'Ava Max - Sweet but Psycho.mp3', 'extent_count': 1, 'start_byte': 731000832,
'length': 7938048, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3',
'filename': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'extent_count': 1,
'start_byte': 738938880, 'length': 6791168, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\Color Music Choir - Something Just Like This (Live).mp3',
'filename': 'Color Music Choir - Something Just Like This (Live).mp3', 'extent_count': 1,
'start_byte': 745730048, 'length': 6193152, 'fragment_index': 1}]
for item in test_file_sort:
extent_count = item['extent_count']
if extent_count == 1:
CopySingleFragmentFiles(item, target_path)
elif extent_count > 1:
CopyMultiFragmentFiles(item, fragment_lists, target_path)

160
test/files_save.py Normal file
View File

@@ -0,0 +1,160 @@
import os
def ExtractVolumeLetter(path: str) -> str:
"""从绝对路径中提取盘符"""
drive = os.path.splitdrive(path)[0]
if not drive:
raise ValueError(f"无法从路径中提取盘符:{path}")
return drive[0].upper() # 返回 'Y'
def CopySingleFragmentFiles(source_data_dict, target_path):
"""
根据起始字节和长度,从磁盘中读取数据并保存为目标文件
:param source_data_dict: 包含源数据信息的字典
:param target_path: 目标文件夹路径
"""
start_byte = source_data_dict.get("start_byte")
byte_length = source_data_dict.get("length")
absolute_path = source_data_dict.get("absolute_path")
file_name = source_data_dict.get("filename")
if byte_length <= 0:
print("错误:字节长度无效")
return
if not absolute_path or not file_name:
print("错误:缺少必要的文件信息")
return
source_disk_path = ExtractVolumeLetter(absolute_path)
target_file_path = os.path.join(target_path, file_name)
try:
# 创建目标目录(如果不存在)
os.makedirs(target_path, exist_ok=True)
with open(fr"\\.\{source_disk_path}:", 'rb') as disk:
disk.seek(start_byte)
with open(target_file_path, 'wb') as f:
remaining = byte_length
CHUNK_SIZE = 1024 * 1024 # 1MB
while remaining > 0:
read_size = min(CHUNK_SIZE, remaining)
chunk = disk.read(read_size)
if not chunk:
print("警告:读取到空数据,可能已到达磁盘末尾。")
break
f.write(chunk)
remaining -= len(chunk)
print(
f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}")
except PermissionError:
print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序")
except Exception as e:
print(f"发生错误: {str(e)}")
def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes:
"""
从指定磁盘的指定起始位置读取指定长度的字节。
:param volume_letter: 盘符(如 "Y"
:param start_byte: 起始字节位置(整数)
:param length: 要读取的字节数(整数)
:return: 读取到的原始字节数据bytes
"""
if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1:
raise ValueError("drive_letter 必须是单个字母,如 'Y'")
# 构建 Windows 设备路径格式:\\.\Y:
disk_path = f"\\\\.\\{volume_letter.strip().upper()}:"
try:
with open(disk_path, "rb") as disk:
disk.seek(start_byte)
data = disk.read(length)
return data
except PermissionError:
raise PermissionError("权限不足,请以管理员身份运行程序")
except Exception as e:
raise RuntimeError(f"读取磁盘失败:{e}")
# if __name__ == "__main__":
# drive = "Y"
# start = 687685632
# size = 7163904
#
# try:
# content = ReadDiskBytes(drive, start, size)
# print(f"成功读取 {len(content)} 字节内容。前100字节为")
# print(content[:100])
# except Exception as e:
# print("错误:", e)
def CopyMultiFragmentFiles(
item: dict,
fragment_lists: dict,
target_path: str
):
"""
处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。
:param item: 包含文件分片信息的字典
:param fragment_lists: 存储各文件分片内容的字典
:param target_path: 恢复文件的目标保存路径
:return: None
"""
file_name = item['filename']
extent_count = item['extent_count']
fragment_index = item['fragment_index']
start_byte = item['start_byte']
length_byte = item['length']
volume_letter = ExtractVolumeLetter(item['absolute_path'])
# 读取分片内容
fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte)
# 如果还没有为这个文件创建列表,则初始化
if file_name not in fragment_lists:
fragment_lists[file_name] = [None] * extent_count
# 将内容插入到指定位置
if fragment_index <= extent_count:
fragment_lists[file_name][fragment_index - 1] = fragment_content
print(f"已写入 {file_name} 的第 {fragment_index} 个片段。")
else:
print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}")
# 检查是否所有分片都已加载
fragments = fragment_lists[file_name]
if None not in fragments:
full_content = b''.join(fragments)
target_file_path = os.path.join(target_path, file_name)
try:
with open(target_file_path, 'wb') as f:
f.write(full_content)
print(f"已成功恢复文件:{file_name}")
except Exception as e:
print(f"写入文件失败:{file_name},错误:{e}")
if __name__ == "__main__":
test_dict = {
'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3',
'filename': 'Aaron Zigman - Main Title.mp3',
'extent_count': 1,
'start_byte': 687685632,
'length': 7163904,
'fragment_index': 1
}
CopySingleFragmentFiles(test_dict, target_path=r"Z:\RecoveredFiles")

232
test/files_sort.py Normal file
View File

@@ -0,0 +1,232 @@
import sqlite3
def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path",
files_path=None) -> list:
"""
根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。
:param db_path: 数据库文件路径
:param table_name: 要查询的数据表名称
:param files_path: 文件的完整路径列表
:return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str}
"""
if files_path is None:
file_paths = []
results = []
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
for path in files_path:
try:
# 使用字符串格式化插入表名,参数化查询只适用于值
sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?"
cursor.execute(sql, (path,))
row = cursor.fetchone()
if row:
results.append({
'absolute_path': path,
'id': row[0],
'name': row[1]
})
else:
print(f"未找到匹配记录:{path}")
except Exception as e:
print(f"查询失败:{path},错误:{e}")
conn.close()
return results
# if __name__ == "__main__":
# test_files = [
# r"CloudMusic/AGA - MIZU.mp3",
# r"CloudMusic/AGA - 一.mp3",
# r"CloudMusic/Aaron Zigman - Main Title.mp3",
# r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
# r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
# r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
# r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
# r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
# ]
#
# result = GetFilesDBPathInfo(files_path=test_files)
# for item in result:
# print(item)
def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node",
path_records: list = None) -> list:
"""
根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。
:param db_path: 数据库文件路径
:param table_name: db_node 表名
:param path_records: 来自 get_db_path_info 的结果列表
:return: 包含文件分片信息的结果列表
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
results = []
for record in path_records:
path_id = record['id']
absolute_path = record['absolute_path']
name = record['name']
try:
# 查询 db_node 表中 PathID 对应的记录
cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,))
row = cursor.fetchone()
if not row:
print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录")
continue
# 获取字段索引(适用于按列名获取)
columns = [desc[0] for desc in cursor.description]
# 构建字典以便按列名访问
node_data = dict(zip(columns, row))
# 获取 ExtentCount
extent_count = node_data.get("ExtentCount", 0)
# 解析分片信息
fragments = []
for i in range(1, 5): # extent1 ~ extent4
loc = node_data.get(f"extent{i}_Location")
length = node_data.get(f"extent{i}_Length")
if loc is not None and length is not None and length > 0:
fragments.append({
"start_byte": loc,
"length": length
})
results.append({
"absolute_path": absolute_path,
"name": name,
"path_id": path_id,
"extent_count": extent_count,
"fragments": fragments
})
except Exception as e:
print(f"查询失败PathID={path_id}, 错误:{e}")
conn.close()
return results
if __name__ == "__main__":
test_files = [
r"CloudMusic/AGA - MIZU.mp3",
r"CloudMusic/AGA - 一.mp3",
r"CloudMusic/Aaron Zigman - Main Title.mp3",
r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
]
# 第一步:获取 db_path 表中的 ID 和 Name
path_info = GetFilesDBPathInfo(files_path=test_files)
# 第二步:根据 PathID 查询 db_node 表中的分片信息
file_extents_info = GetFilesDBNodeInfo(path_records=path_info)
# 打印结果
for item in file_extents_info:
print(item)
def sort_fragments_by_start_byte(file_extents_list: list) -> list:
"""
对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。
:param file_extents_list: get_file_extents_info 返回的结果列表
:return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息
"""
all_fragments = []
for file_info in file_extents_list:
absolute_path = file_info['absolute_path']
filename = file_info['name']
extent_count = file_info['extent_count']
fragments = file_info['fragments']
# 对当前文件的片段排序(虽然通常已经是有序的)
sorted_fragments = sorted(fragments, key=lambda x: x['start_byte'])
# 添加片段索引信息
for idx, fragment in enumerate(sorted_fragments, start=1):
all_fragments.append({
'absolute_path': absolute_path,
'filename': filename,
'extent_count': extent_count,
'start_byte': fragment['start_byte'],
'length': fragment['length'],
'fragment_index': idx
})
# 全局排序:按 start_byte 排序所有片段
all_fragments.sort(key=lambda x: x['start_byte'])
return all_fragments
# if __name__ == "__main__":
# test_files = [
# r"CloudMusic/AGA - MIZU.mp3",
# r"CloudMusic/AGA - 一.mp3",
# r"CloudMusic/Aaron Zigman - Main Title.mp3",
# r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
# r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
# r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
# r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
# r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
# ]
# test_files_sort = [
# {'absolute_path': 'CloudMusic/AGA - MIZU.mp3', 'name': 'AGA - MIZU.mp3', 'path_id': 6, 'extent_count': 1,
# 'fragments': [{'start_byte': 694849536, 'length': 8126464}]},
# {'absolute_path': 'CloudMusic/AGA - 一.mp3', 'name': 'AGA - 一.mp3', 'path_id': 7, 'extent_count': 2,
# 'fragments': [{'start_byte': 702976000, 'length': 10870784}, {'start_byte': 23162880, 'length': 69632}]},
# {'absolute_path': 'CloudMusic/Aaron Zigman - Main Title.mp3', 'name': 'Aaron Zigman - Main Title.mp3',
# 'path_id': 5, 'extent_count': 1, 'fragments': [{'start_byte': 687685632, 'length': 7163904}]},
# {'absolute_path': 'CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3',
# 'name': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'path_id': 8, 'extent_count': 1,
# 'fragments': [{'start_byte': 713846784, 'length': 7970816}]},
# {'absolute_path': 'CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
# 'name': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', 'path_id': 9,
# 'extent_count': 1, 'fragments': [{'start_byte': 721817600, 'length': 9179136}]},
# {'absolute_path': 'CloudMusic/Ava Max - Sweet but Psycho.mp3', 'name': 'Ava Max - Sweet but Psycho.mp3',
# 'path_id': 10, 'extent_count': 1, 'fragments': [{'start_byte': 731000832, 'length': 7938048}]},
# {'absolute_path': 'CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3',
# 'name': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'path_id': 11, 'extent_count': 1,
# 'fragments': [{'start_byte': 738938880, 'length': 6791168}]},
# {'absolute_path': 'CloudMusic/Color Music Choir - Something Just Like This (Live).mp3',
# 'name': 'Color Music Choir - Something Just Like This (Live).mp3', 'path_id': 12, 'extent_count': 1,
# 'fragments': [{'start_byte': 745730048, 'length': 6193152}]}]
#
# path_info = GetFilesDBPathInfo(files_path=test_files)
# file_extents_data = GetFilesDBNodeInfo(path_records=path_info)
#
# # 根据文件片段先后排序
# single_fragment_result = sort_fragments_by_start_byte(file_extents_data)
#
# # 模拟多文件片段,根据文件片段先后排序
# multi_fragment_result = sort_fragments_by_start_byte(test_files_sort)
#
# print("单文件片段排序结果:")
# for item in single_fragment_result:
# print(item)
#
# print("\n多文件片段排序结果")
# for item in multi_fragment_result:
# print(item)

199
test/folders_sort.py Normal file
View File

@@ -0,0 +1,199 @@
import os
import sqlite3
from files_sort import GetFilesDBNodeInfo, sort_fragments_by_start_byte
def GetFolderID(
folder_path: str,
db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path"
) -> int | None:
"""
根据文件夹路径,查询数据库中该文件夹对应的 ID。
:param folder_path: 文件夹路径(如 r"CloudMusic\\"
:param db_path: 数据库文件路径
:param table_name: 要查询的数据表名称,默认为 'db_path'
:return: 成功则返回 IDint失败返回 None
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 使用 table_name 构建 SQL 查询
sql = f"SELECT ID FROM {table_name} WHERE Path = ?"
cursor.execute(sql, (folder_path,))
result = cursor.fetchone()
if result:
return result[0]
else:
print(f"未找到路径:{folder_path} 在表 {table_name}")
return None
except sqlite3.Error as e:
print(f"数据库操作失败:{e}")
return None
finally:
conn.close()
def GetSubPathsByParentID(
parent_id: int,
db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path"
) -> list:
"""
根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。
:param parent_id: 父节点 ID
:param db_path: 数据库文件路径
:param table_name: 数据表名称
:return: 包含 ID、Path、Name 的字典列表
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
sql = f"""
SELECT ID, Path, Name
FROM {table_name}
WHERE ParentID = ?
"""
try:
cursor.execute(sql, (parent_id,))
rows = cursor.fetchall()
except Exception as e:
print(f"数据库查询失败:{e}")
return []
results = []
for row in rows:
item = {
'id': row[0],
'absolute_path': row[1],
'name': row[2]
}
results.append(item)
conn.close()
return results
if __name__ == "__main__":
test_folder_path = "pictures/"
parent_id_test = GetFolderID(test_folder_path)
# node_data = GetNodeFragmentsByParentID(parent_id_test)
path_data = GetSubPathsByParentID(parent_id_test)
node_data = GetFilesDBNodeInfo(path_records=path_data)
for data in node_data:
print(data)
def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list:
"""
根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。
:param db_path: 要查询的数据库
:param folder_path: 文件夹的绝对路径
:return list: 文件夹下所有文件按片段顺序排列的列表
"""
parent_id = GetFolderID(folder_path=folder_path, db_path=db_path)
path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path)
node_data = GetFilesDBNodeInfo(path_records=path_data)
result = sort_fragments_by_start_byte(node_data)
return result
# if __name__ == "__main__":
# folder_path_test = "pictures/"
# data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test)
# for item in data:
# print(item)
def ScanDirectory(root_dir, skip_system=True):
"""
递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。
:param root_dir: 要扫描的根目录路径
:param skip_system: 是否跳过系统目录(默认 True
:return: 文件路径列表,格式为 relative/path/to/file.ext
"""
file_list = []
for root, dirs, files in os.walk(root_dir):
# 跳过系统目录
if skip_system:
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
for file in files:
full_path = os.path.join(root, file)
# 去掉盘符
_, relative_path = os.path.splitdrive(full_path)
# 替换 \ 为 /
relative_path = relative_path.lstrip("\\").replace("\\", "/")
file_list.append(relative_path)
return file_list
# if __name__ == "__main__":
# folder_path = r"Y:/folder1/"
# files_list = ScanDirectory(folder_path)
#
# print(f"共找到 {len(files_list)} 个文件:")
# for f in files_list:
# print(f)
def ScanMultiFolders(folder_paths, skip_system=True):
"""
扫描多个根目录,返回所有子目录中的文件路径列表。
:param folder_paths: 包含多个根目录的列表
:param skip_system: 是否跳过系统目录(默认 True
:return: 所有文件的相对路径列表(格式为 folder/file.ext
"""
all_files = []
for root_dir in folder_paths:
# 确保路径存在
if not os.path.exists(root_dir):
print(f"⚠️ 路径不存在:{root_dir}")
continue
for root, dirs, files in os.walk(root_dir):
# 跳过系统目录
if skip_system:
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
for file in files:
full_path = os.path.join(root, file)
# 去掉盘符
_, relative_path = os.path.splitdrive(full_path)
relative_path = relative_path.lstrip("\\").replace("\\", "/")
all_files.append(relative_path)
return all_files
if __name__ == "__main__":
folders = [
r"Y:\CloudMusic",
r"Y:\folder1"
]
files = ScanMultiFolders(folders)
print(f"共找到 {len(files)} 个文件:")
for f in files:
print(f)

92
test/get_extent_counts.py Normal file
View File

@@ -0,0 +1,92 @@
def analyze_ntfs_data_attribute(data):
"""
分析 NTFS 数据结构中的80属性($DATA),返回文件分片数量
参数:
data (list): 包含字典的列表,每个字典需有'sequence'
(示例结构见问题描述)
返回:
int: 分片数量(常驻属性返回1非常驻属性返回数据运行的分片数)
异常:
ValueError: 当输入数据无效时抛出
"""
# 第一步提取并转换sequence数据
hex_bytes = []
for entry in data:
if 'sequence' in entry:
for hex_str in entry['sequence']:
hex_bytes.extend(hex_str.split())
print(hex_bytes)
# 将十六进制字符串转换为整数列表
try:
attribute_data = [int(x, 16) for x in hex_bytes]
except ValueError:
raise ValueError("无效的十六进制数据")
# 第二步:分析属性结构
if len(attribute_data) < 24:
raise ValueError("属性数据过短,无法解析头部信息")
# 检查属性类型(0x80)
if attribute_data[0] != 0x80:
raise ValueError("不是80属性($DATA属性)")
# 检查是否常驻(偏移0x08)
is_resident = attribute_data[8] == 0
if is_resident:
return 1
else:
# 解析非常驻属性的数据运行列表
data_run_offset = attribute_data[0x20] | (attribute_data[0x21] << 8)
if data_run_offset >= len(attribute_data):
raise ValueError("数据运行偏移超出属性长度")
data_runs = attribute_data[data_run_offset:]
fragment_count = 0
pos = 0
while pos < len(data_runs):
header_byte = data_runs[pos]
if header_byte == 0x00:
break
len_len = (header_byte >> 4) & 0x0F
offset_len = header_byte & 0x0F
if len_len == 0 or offset_len == 0:
break
pos += 1 + len_len + offset_len
fragment_count += 1
return fragment_count
input_data = [
{
'start_byte': 3221267456,
'offset': 264,
'sequence': [
'80 00 00 00 48 00 00 00',
'01 00 00 00 00 00 01 00',
'00 00 00 00 00 00 00 00',
'79 00 00 00 00 00 00 00',
'40 00 00 00 00 00 00 00',
'00 a0 07 00 00 00 00 00',
'0b 93 07 00 00 00 00 00',
'0b 93 07 00 00 00 00 00',
'31 7a 00 ee 0b 00 00 00'
],
'is_resident': False,
'total_groups': 9,
'attribute_length': 72
}
]
print(analyze_ntfs_data_attribute(input_data)) # 输出分片数量

View File

@@ -0,0 +1,105 @@
def ParseDataRuns(data_bytes: list, cluster_size=512):
"""
解析 NTFS $80 属性中的数据运行Data Run返回每个分片的起始字节数和长度。
参数:
data_bytes (list): 十六进制字符串组成的列表,表示完整的 $80 属性内容。
cluster_size (int): 簇大小(默认为 512 字节)
返回:
dict: 包含每个分片信息的字典,格式如下:
{
"is_resident": False,
"data_runs": {
"片段1": {"起始字节数": 3202351104, "字节长度": 499712 - 1},
"片段2": {...}
}
}
"""
def hex_list_to_int(lst, length, byteorder='little'):
"""从列表中提取指定长度的字节并转换为整数"""
bytes_data = bytes([int(x, 16) for x in lst[:length]])
return int.from_bytes(bytes_data, byteorder=byteorder)
result = {
"is_resident": True,
"data_runs": {}
}
# 检查是否是 $80 属性
if data_bytes[0] != '80':
raise ValueError("不是 $80 属性")
# 常驻标志在偏移 0x08第 8 个字节)
is_resident = data_bytes[8] == '00'
result["is_resident"] = is_resident
if is_resident:
result["data_runs"]["常驻文件"] = {
"起始字节数": 0,
"字节长度": "该文件为常驻,无分片"
}
return result
# 非常驻属性:获取数据运行偏移(偏移 0x20 处的 DWORD
data_run_offset = hex_list_to_int(data_bytes[0x20:0x20 + 4], 4)
if data_run_offset >= len(data_bytes):
raise ValueError("数据运行偏移超出范围")
# 提取数据运行部分
data_run_bytes = data_bytes[data_run_offset:]
pos = 0
fragment_index = 1
while pos < len(data_run_bytes):
header_byte = int(data_run_bytes[pos], 16)
if header_byte == 0x00:
break
# 高4位长度字段数量低4位偏移字段数量
len_len = (header_byte >> 4) & 0x0F
offset_len = header_byte & 0x0F
if len_len == 0 or offset_len == 0:
break
pos += 1
# 提取偏移量(小端序)
offset_bytes = data_run_bytes[pos:pos + offset_len]
offset = hex_list_to_int(offset_bytes, offset_len, byteorder='little')
# 提取长度(小端序)
length_bytes = data_run_bytes[pos + offset_len:pos + offset_len + len_len]
length = hex_list_to_int(length_bytes, len_len, byteorder='little')
# 计算起始字节数 = offset * cluster_size
start_byte = offset * cluster_size
byte_length = length * cluster_size - 1
result["data_runs"][f"片段{fragment_index}"] = {
"起始字节数": start_byte,
"字节长度": byte_length
}
pos += offset_len + len_len
fragment_index += 1
return result
input_data = [
'80', '00', '00', '00', '48', '00', '00', '00',
'01', '00', '00', '00', '00', '00', '01', '00',
'00', '00', '00', '00', '00', '00', '00', '00',
'79', '00', '00', '00', '00', '00', '00', '00',
'40', '00', '00', '00', '00', '00', '00', '00',
'00', 'a0', '07', '00', '00', '00', '00', '00',
'0b', '93', '07', '00', '00', '00', '00', '00',
'0b', '93', '07', '00', '00', '00', '00', '00',
'31', '7a', '00', 'ee', '0b', '00', '00', '00'
]
result = ParseDataRuns(input_data)
print(result)