Compare commits

15 Commits

Author SHA1 Message Date
Burgess Leo
4d7c2e995c project done but test failed 2025-05-27 13:10:13 +08:00
Burgess Leo
491685e892 optimize db_path memory 2025-05-23 18:01:42 +08:00
Burgess Leo
d4a411ce68 almost finish 2025-05-23 13:54:31 +08:00
Burgess Leo
1fb457b67d ã/b_path style 2025-05-22 17:27:02 +08:00
Burgess Leo
d2a3a7b5b5 \db_path style 2025-05-22 17:21:44 +08:00
Burgess Leo
3347abe02f finish fragment files copy 2025-05-22 13:03:09 +08:00
Burgess Leo
0c98dfecda finish copy files follow bytes sort 2025-05-22 09:16:37 +08:00
Burgess Leo
cd536a6bd3 add SaveFile 2025-05-20 18:01:19 +08:00
Burgess Leo
08a47c6d8a finish all table analyze 2025-05-20 16:26:58 +08:00
846c7f6beb 删除 src/db_ntfs_info.db 2025-05-19 17:47:30 +08:00
Burgess Leo
deaf97607e modify .gitignore 2025-05-19 17:46:49 +08:00
Burgess Leo
697b449bff xx 2025-05-19 17:33:30 +08:00
Burgess Leo
07a4ae7a74 temp restore 2025-05-19 13:25:07 +08:00
Burgess Leo
b2e14fdbe0 restore mft_analyze 2025-05-19 11:03:36 +08:00
Burgess Leo
e167ff5d9f temp store 2025-05-19 10:21:12 +08:00
22 changed files with 2044 additions and 322 deletions

View File

View File

@@ -23,7 +23,12 @@ def ClearTableRecordsWithReset(db_path, table_name):
if __name__ == '__main__':
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_path')
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_device')
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_config')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_path')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_node')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_device')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_config')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_user')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_group')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_extend_extent')
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_extend_name')

View File

@@ -86,9 +86,9 @@ def CreateDBDeviceTable(db_path='../src/db_ntfs_info.db', table_name='db_device'
def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
"""
创建 NewDBNode 表,用于存储文件的具体属性和物理分布信息。
在指定路径下创建 SQLite 数据库,并在其中创建节点信息
:param db_path: str, 数据库文件路径
:param db_path: str, 数据库文件路径
:param table_name: str, 要创建的表名
:return: None
"""
@@ -100,18 +100,28 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
# 连接到SQLite数据库如果文件不存在会自动创建
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys = ON;") # 开启外键支持
# 创建一个游标对象
cursor = conn.cursor()
# 动态构建创建表的SQL语句
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
PathID INTEGER NOT NULL,
PathID INTEGER,
ParentID INTEGER,
NameHash TEXT,
PathHash TEXT,
ExtendNameID INTEGER,
DirLayer INTEGER,
GroupID INTEGER,
UserID INTEGER,
FileCreateTime TEXT,
FileModifyTime TEXT,
FileAccessTime TEXT,
FileAuthTime TEXT,
FileSize INTEGER,
FileMode INTEGER,
FileHash TEXT,
ExtentCount INTEGER,
extent1_DeviceID INTEGER,
@@ -127,17 +137,21 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
extent4_Location INTEGER,
extent4_Length INTEGER,
-- 外键约束
FOREIGN KEY(PathID) REFERENCES NewDBPath(ID),
FOREIGN KEY(ExtendNameID) REFERENCES db_extend(ID),
FOREIGN KEY(GroupID) REFERENCES db_group(ID),
FOREIGN KEY(UserID) REFERENCES db_user(ID)
-- 外键约束(可选)
FOREIGN KEY(PathID) REFERENCES db_path(ID),
FOREIGN KEY(ExtendNameID) REFERENCES extname_table(ID),
FOREIGN KEY(GroupID) REFERENCES groups(ID),
FOREIGN KEY(UserID) REFERENCES users(ID)
);
"""
# 执行SQL语句
cursor.execute(create_table_sql)
# 提交更改
conn.commit()
# 关闭连接
conn.close()
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
@@ -266,11 +280,12 @@ def CreateDBExtendSnippetTable(db_path='../src/db_ntfs_info.db', table_name='db_
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
def CreateDBPathTable(db_path='../src/db_ntfs_info.db', table_name='db_path'):
def CreateDBPathTable(db_path='../src/db_path.db', table_name='db_path'):
"""
创建 NewDBPath 表,用于存储文件/目录的路径信息
在指定路径下创建 SQLite 数据库,并在其中创建路径信息表,
包含 DeviceID 字段,用于标记文件所属设备(磁盘)。
:param db_path: str, 数据库文件路径
:param db_path: str, 数据库文件路径
:param table_name: str, 要创建的表名
:return: None
"""
@@ -280,42 +295,39 @@ def CreateDBPathTable(db_path='../src/db_ntfs_info.db', table_name='db_path'):
if directory and not os.path.exists(directory):
os.makedirs(directory)
# 连接到SQLite数据库如果不存在会自动创建
# 连接到SQLite数据库如果文件不存在会自动创建)
conn = sqlite3.connect(db_path)
conn.execute("PRAGMA foreign_keys = ON;") # 开启外键支持
cursor = conn.cursor()
# 动态构建创建表的SQL语句
# 动态构建创建表的SQL语句(包含 DeviceID 外键)
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
-- DeviceID TEXT NOT NULL,
Path TEXT NOT NULL,
Name TEXT NOT NULL,
DirLayer INTEGER NOT NULL,
PathHash TEXT UNIQUE NOT NULL,
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
ParentID INTEGER,
ContentSize INTEGER,
FileCreateTime TEXT,
FileModifyTime TEXT,
FileAccessTime TEXT,
FileAuthTime TEXT,
FileMode TEXT,
-- 外键约束(可选)
-- 外键约束
-- FOREIGN KEY(DeviceID) REFERENCES db_device(ID),
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
);
"""
# 执行SQL语句
cursor.execute(create_table_sql)
# 提交更改
conn.commit()
conn.close()
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
def CreateDBExtendNameTable(db_path='../src/db_ntfs_info.db', table_name='db_extend_name'):
def CreateDBExtendNameTable(db_path='../src/db_extend_name.db', table_name='db_extend_name'):
"""
在指定路径下创建 SQLite 数据库,并在其中创建扩展名表。

23
fake_main.py Normal file
View File

@@ -0,0 +1,23 @@
import itertools
from files_utils.files_save import CopySingleFragmentFiles, CopyMultiFragmentFiles
from files_utils.files_sort import GetSortFragments
from files_utils.folders_sort import ClassifyFilesAndFolders, ScanMultiFolders
fragment_lists = {}
target_path = r"Z:\test_files"
mix_test_data = [
"test-copy"
]
classify_files_and_folders = ClassifyFilesAndFolders(mix_test_data)
files_list = classify_files_and_folders["files"]
folders_files_list = ScanMultiFolders(classify_files_and_folders["folders"])
merged_list = list(itertools.chain(files_list, folders_files_list))
sort_fragments = GetSortFragments(db_path="./src/db_ntfs_info.db", files_list=merged_list)
for item in sort_fragments:
extent_count = item['extent_count']
if extent_count == 1:
CopySingleFragmentFiles(item, target_path=target_path)
elif extent_count > 1:
CopyMultiFragmentFiles(item, fragment_lists=fragment_lists, target_path=target_path)

131
files_utils/files_save.py Normal file
View File

@@ -0,0 +1,131 @@
import os
def GetVolumeLetter() -> str:
from ntfs_utils.main import volume_letter
return volume_letter
def CopySingleFragmentFiles(source_data_dict, target_path):
"""
根据起始字节和长度,从磁盘中读取数据并保存为目标文件
:param source_data_dict: 包含源数据信息的字典
:param target_path: 目标文件夹路径
"""
start_byte = source_data_dict.get("start_byte")
byte_length = source_data_dict.get("length")
absolute_path = source_data_dict.get("absolute_path")
file_name = source_data_dict.get("filename")
if byte_length <= 0:
print("错误:字节长度无效")
return
if not absolute_path or not file_name:
print("错误:缺少必要的文件信息")
return
source_disk_path = GetVolumeLetter()
target_file_path = os.path.join(target_path, file_name)
try:
# 创建目标目录(如果不存在)
os.makedirs(target_path, exist_ok=True)
with open(fr"\\.\{source_disk_path}:", 'rb') as disk:
disk.seek(start_byte)
with open(target_file_path, 'wb') as f:
remaining = byte_length
CHUNK_SIZE = 1024 * 1024 # 1MB
while remaining > 0:
read_size = min(CHUNK_SIZE, remaining)
chunk = disk.read(read_size)
if not chunk:
print("警告:读取到空数据,可能已到达磁盘末尾。")
break
f.write(chunk)
remaining -= len(chunk)
print(
f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}")
except PermissionError:
print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序")
except Exception as e:
print(f"发生错误: {str(e)}")
def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes:
"""
从指定磁盘的指定起始位置读取指定长度的字节。
:param volume_letter: 盘符(如 "Y"
:param start_byte: 起始字节位置(整数)
:param length: 要读取的字节数(整数)
:return: 读取到的原始字节数据bytes
"""
if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1:
raise ValueError("drive_letter 必须是单个字母,如 'Y'")
# 构建 Windows 设备路径格式:\\.\Y:
disk_path = f"\\\\.\\{volume_letter.strip().upper()}:"
try:
with open(disk_path, "rb") as disk:
disk.seek(start_byte)
data = disk.read(length)
return data
except PermissionError:
raise PermissionError("权限不足,请以管理员身份运行程序")
except Exception as e:
raise RuntimeError(f"读取磁盘失败:{e}")
def CopyMultiFragmentFiles(
item: dict,
fragment_lists: dict,
target_path: str
):
"""
处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。
:param item: 包含文件分片信息的字典
:param fragment_lists: 存储各文件分片内容的字典
:param target_path: 恢复文件的目标保存路径
:return: None
"""
file_name = item['filename']
extent_count = item['extent_count']
fragment_index = item['fragment_index']
start_byte = item['start_byte']
length_byte = item['length']
volume_letter = GetVolumeLetter()
# 读取分片内容
fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte)
# 如果还没有为这个文件创建列表,则初始化
if file_name not in fragment_lists:
fragment_lists[file_name] = [None] * extent_count
# 将内容插入到指定位置
if fragment_index <= extent_count:
fragment_lists[file_name][fragment_index - 1] = fragment_content
print(f"已写入 {file_name} 的第 {fragment_index} 个片段。")
else:
print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}")
# 检查是否所有分片都已加载
fragments = fragment_lists[file_name]
if None not in fragments:
full_content = b''.join(fragments)
target_file_path = os.path.join(target_path, file_name)
try:
with open(target_file_path, 'wb') as f:
f.write(full_content)
print(f"已成功恢复文件:{file_name}")
except Exception as e:
print(f"写入文件失败:{file_name},错误:{e}")

148
files_utils/files_sort.py Normal file
View File

@@ -0,0 +1,148 @@
import sqlite3
def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path",
files_path=None) -> list:
"""
根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。
:param db_path: 数据库文件路径
:param table_name: 要查询的数据表名称
:param files_path: 文件的完整路径列表
:return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str}
"""
if files_path is None:
files_path = []
results = []
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
for path in files_path:
try:
# 使用字符串格式化插入表名,参数化查询只适用于值
sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?"
cursor.execute(sql, (path,))
row = cursor.fetchone()
if row:
results.append({
'absolute_path': path,
'id': row[0],
'name': row[1]
})
else:
print(f"未找到匹配记录:{path}")
except Exception as e:
print(f"查询失败:{path},错误:{e}")
conn.close()
return results
def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node",
path_records: list = None) -> list:
"""
根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。
:param db_path: 数据库文件路径
:param table_name: db_node 表名
:param path_records: 来自 get_db_path_info 的结果列表
:return: 包含文件分片信息的结果列表
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
results = []
for record in path_records:
path_id = record['id']
absolute_path = record['absolute_path']
name = record['name']
try:
# 查询 db_node 表中 PathID 对应的记录
cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,))
row = cursor.fetchone()
if not row:
print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录")
continue
# 获取字段索引(适用于按列名获取)
columns = [desc[0] for desc in cursor.description]
# 构建字典以便按列名访问
node_data = dict(zip(columns, row))
# 获取 ExtentCount
extent_count = node_data.get("ExtentCount", 0)
# 解析分片信息
fragments = []
for i in range(1, 5): # extent1 ~ extent4
loc = node_data.get(f"extent{i}_Location")
length = node_data.get(f"extent{i}_Length")
if loc is not None and length is not None and length > 0:
fragments.append({
"start_byte": loc,
"length": length
})
results.append({
"absolute_path": absolute_path,
"name": name,
"path_id": path_id,
"extent_count": extent_count,
"fragments": fragments
})
except Exception as e:
print(f"查询失败PathID={path_id}, 错误:{e}")
conn.close()
return results
def SortFragmentsByStartByte(file_extents_list: list) -> list:
"""
对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。
:param file_extents_list: get_file_extents_info 返回的结果列表
:return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息
"""
all_fragments = []
for file_info in file_extents_list:
absolute_path = file_info['absolute_path']
filename = file_info['name']
extent_count = file_info['extent_count']
fragments = file_info['fragments']
# 对当前文件的片段排序(虽然通常已经是有序的)
sorted_fragments = sorted(fragments, key=lambda x: x['start_byte'])
# 添加片段索引信息
for idx, fragment in enumerate(sorted_fragments, start=1):
all_fragments.append({
'absolute_path': absolute_path,
'filename': filename,
'extent_count': extent_count,
'start_byte': fragment['start_byte'],
'length': fragment['length'],
'fragment_index': idx
})
# 全局排序:按 start_byte 排序所有片段
all_fragments.sort(key=lambda x: x['start_byte'])
return all_fragments
def GetSortFragments(db_path: str = "../src/db_ntfs_info.db", files_list: list = None) -> list:
path_info = GetFilesDBPathInfo(db_path=db_path, table_name="db_path", files_path=files_list)
node_info = GetFilesDBNodeInfo(db_path=db_path, table_name="db_node", path_records=path_info)
result = SortFragmentsByStartByte(node_info)
return result

View File

@@ -0,0 +1,14 @@
import subprocess
source_path = r"Y:\\test-copy"
target_path = r"Z:\\test-copy"
subprocess.run([
"robocopy",
source_path,
target_path,
"/E", # 包括子目录
"/R:3", # 重试次数
"/W:1", # 重试等待时间
"/MT:16" # 多线程16线程
])

263
files_utils/folders_sort.py Normal file
View File

@@ -0,0 +1,263 @@
import os
import sqlite3
from files_utils.files_sort import GetFilesDBNodeInfo, SortFragmentsByStartByte
from ntfs_utils.main import volume_letter
def GetFolderID(
folder_path: str,
db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path"
) -> int | None:
"""
根据文件夹路径,查询数据库中该文件夹对应的 ID。
:param folder_path: 文件夹路径(如 r"CloudMusic\\"
:param db_path: 数据库文件路径
:param table_name: 要查询的数据表名称,默认为 'db_path'
:return: 成功则返回 IDint失败返回 None
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 使用 table_name 构建 SQL 查询
sql = f"SELECT ID FROM {table_name} WHERE Path = ?"
cursor.execute(sql, (folder_path,))
result = cursor.fetchone()
if result:
return result[0]
else:
print(f"未找到路径:{folder_path} 在表 {table_name}")
return None
except sqlite3.Error as e:
print(f"数据库操作失败:{e}")
return None
finally:
conn.close()
def GetSubPathsByParentID(
parent_id: int,
db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path"
) -> list:
"""
根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。
:param parent_id: 父节点 ID
:param db_path: 数据库文件路径
:param table_name: 数据表名称
:return: 包含 ID、Path、Name 的字典列表
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
sql = f"""
SELECT ID, Path, Name
FROM {table_name}
WHERE ParentID = ?
"""
try:
cursor.execute(sql, (parent_id,))
rows = cursor.fetchall()
except Exception as e:
print(f"数据库查询失败:{e}")
return []
results = []
for row in rows:
item = {
'id': row[0],
'absolute_path': row[1],
'name': row[2]
}
results.append(item)
conn.close()
return results
def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list:
"""
根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。
:param db_path: 要查询的数据库
:param folder_path: 文件夹的绝对路径
:return list: 文件夹下所有文件按片段顺序排列的列表
"""
parent_id = GetFolderID(folder_path=folder_path, db_path=db_path)
path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path)
node_data = GetFilesDBNodeInfo(path_records=path_data)
result = SortFragmentsByStartByte(node_data)
return result
# if __name__ == "__main__":
# folder_path_test = "pictures/"
# data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test)
# for item in data:
# print(item)
def ScanDirectory(root_dir, skip_system=True):
"""
递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。
:param root_dir: 要扫描的根目录路径
:param skip_system: 是否跳过系统目录(默认 True
:return: 文件路径列表,格式为 relative/path/to/file.ext
"""
file_list = []
for root, dirs, files in os.walk(root_dir):
# 跳过系统目录
if skip_system:
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
for file in files:
full_path = os.path.join(root, file)
# 去掉盘符
_, relative_path = os.path.splitdrive(full_path)
# 替换 \ 为 /
relative_path = relative_path.lstrip("\\").replace("\\", "/")
file_list.append(relative_path)
return file_list
# if __name__ == "__main__":
# folder_path = r"Y:/folder1/"
# files_list = ScanDirectory(folder_path)
#
# print(f"共找到 {len(files_list)} 个文件:")
# for f in files_list:
# print(f)
def ScanMultiFolders(folder_paths_list: list, skip_system: bool = True) -> list:
"""
扫描多个根目录,返回所有文件的相对路径列表(格式为 folder/file.ext
:param folder_paths_list: 要扫描的根目录列表(如 ["CloudMusic/", "folder1/"]
:param skip_system: 是否跳过系统目录
:return: 文件路径列表(统一格式为 folder/file.ext
"""
all_files = []
for root_dir in folder_paths_list:
# 规范化输入路径,确保结尾有 '/'(如果是目录)
normalized_root_dir = root_dir.replace("\\", "/")
if not normalized_root_dir.endswith("/"):
normalized_root_dir += "/" # 确保结尾 /
full_root_path = f"{volume_letter}:/{normalized_root_dir}"
full_root_path = os.path.normpath(full_root_path)
if not os.path.exists(full_root_path):
print(f"⚠️ 路径不存在:{full_root_path}")
continue
for root, dirs, files in os.walk(full_root_path):
if skip_system:
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
for file in files:
full_file_path = os.path.join(root, file)
# 去掉盘符
_, relative_path = os.path.splitdrive(full_file_path)
# 去除开头和结尾的 '\' 或 '/' 并替换分隔符
normalized_path = relative_path.strip("\\").strip("/").replace("\\", "/")
all_files.append(normalized_path)
return all_files
# if __name__ == "__main__":
# folders = [
# "CloudMusic\\",
# "folder1/"
# ]
#
# files = ScanMultiFolders(folders)
#
# print(f"共找到 {len(files)} 个文件:")
# for f in files:
# print(f)
def ClassifyFilesAndFolders(paths: list) -> dict:
"""
将传入的路径列表分类为文件和目录,并统一使用 '/' 分隔符。
确保目录路径以 '/' 结尾。
:param paths: 路径列表(元素可以是文件或目录)
:return: 包含 'files''directories' 的字典,路径格式统一为 '/'
"""
files = []
directories = []
for path in paths:
# 统一用 '/' 分隔符,并保留原始结构(是否以 '/' 结尾)
normalized_path = path.replace("\\", "/")
# 判断是否原本是目录(以 '/' 或 '\' 结尾)
is_potential_dir = normalized_path.endswith("/")
# 拼接完整路径用于判断是否存在
full_path = f"{volume_letter}:/{normalized_path.lstrip('/')}"
full_path = os.path.normpath(full_path)
if os.path.isfile(full_path):
# 如果是文件,去掉结尾的 /(如果有的话)
if normalized_path.endswith("/"):
normalized_path = normalized_path.rstrip("/")
files.append(normalized_path)
elif os.path.isdir(full_path):
# 如果是目录,确保以 '/' 结尾
if not normalized_path.endswith("/"):
normalized_path += "/"
directories.append(normalized_path)
else:
print(f"⚠️ 路径不存在或类型未知:{normalized_path}")
return {
'files': files,
'folders': directories
}
# if __name__ == "__main__":
# test_paths = [
# "CloudMusic\\AGA - MIZU.mp3",
# "CloudMusic/AGA - 一.mp3",
# "CloudMusic/Aaron Zigman - Main Title.mp3",
# "CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
# "CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
# "CloudMusic/Ava Max - Sweet but Psycho.mp3",
# "CloudMusic\\",
# "folder1/",
# "CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
# "CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
# ]
#
# result = ClassifyFilesAndFolders(test_paths)
#
# print("✅ 文件列表:")
# for f in result['files']:
# print(f)
#
# print("\n📁 文件夹列表:")
# for d in result['directories']:
# print(d)

View File

View File

@@ -1,18 +1,19 @@
import hashlib
import random
import os
import sqlite3
from mft_analyze import GetFile80hPattern
from datetime import datetime
from ntfs_utils.mft_analyze import GetFile80hPattern, GetFragmentData, ExtractSequenceHexValues, hex_list_to_int
from ntfs_utils.main import volume_letter
# 工具函数:获取文件扩展名
# 工具函数:获取文件扩展名
def GetFileExtension(name: str) -> str:
parts = name.rsplit('.', 1)
if len(parts) > 1:
return parts[1].lower()
return ""
return parts[1].lower() if len(parts) > 1 else ""
# ✅ 函数:获取 ExtendNameID基于文件名后缀
# 获取 ExtendNameID基于文件名后缀
def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int:
ext = GetFileExtension(name)
if not ext:
@@ -23,165 +24,243 @@ def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int:
return result[0] if result else 0
# ✅ 函数:获取 GroupID默认第一个
# 获取 DirLayer路径层级
def GetDirLayer(path: str) -> int:
path = path.strip()
if not path or path == "\\":
return 0
return path.count("\\") - 1
# 获取 GroupID默认第一个
def GetFirstGroupId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_group ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# ✅ 函数:获取 UserID默认第一个
# 获取 UserID默认第一个
def GetFirstUserId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_user ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# ✅ 【伪代码】获取文件大小(字节)
def GetFileSize(full_path: str) -> int:
return 10
def GetFilesTime(file_path):
"""
获取指定文件的创建时间、修改时间、访问时间和权限变更时间。
st_atime: 最后一次访问时间FileAccessTime
st_mtime: 最后一次修改内容的时间FileModifyTime
st_ctime: 文件元数据metadata更改时间在 Windows 中是文件创建时间FileCreateTime
参数:
file_path (str): 文件的绝对路径
返回:
dict: 包含 FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime 的字符串格式,
如果无法获取则返回 "default"
"""
if not os.path.exists(file_path):
return {
"FileCreateTime": "default",
"FileModifyTime": "default",
"FileAccessTime": "default",
"FileAuthTime": "default"
}
try:
stat_info = os.stat(file_path)
def ts_to_str(timestamp):
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
create_time = ts_to_str(stat_info.st_ctime)
modify_time = ts_to_str(stat_info.st_mtime)
access_time = ts_to_str(stat_info.st_atime)
# 权限变更时间Windows 下可能不适用
try:
auth_time = ts_to_str(getattr(stat_info, 'st_birthtime', stat_info.st_ctime))
except Exception:
auth_time = "default"
return {
"FileCreateTime": create_time,
"FileModifyTime": modify_time,
"FileAccessTime": access_time,
"FileAuthTime": auth_time
}
except Exception as e:
print(f"❌ 获取文件时间失败: {e}")
return {
"FileCreateTime": "default",
"FileModifyTime": "default",
"FileAccessTime": "default",
"FileAuthTime": "default"
}
# ✅ 【伪代码】获取文件内容哈希
def GetFileHash(full_path: str) -> str:
return hashlib.sha256(b"mocked_file_content").hexdigest()
# ✅ 【伪代码】获取分片数
def GetExtentCount(full_path: str) -> int:
return 1
# ✅ 【伪代码】获取设备IDdb_device第一条记录
# 获取设备IDdb_device第一条记录
def GetDeviceId(cursor: sqlite3.Cursor) -> int:
cursor.execute("SELECT ID FROM db_device ORDER BY ID LIMIT 1")
result = cursor.fetchone()
return result[0] if result else 0
# ✅ 【伪代码】获取随机 Location
def GetRandomLocation() -> int:
return random.randint(1000, 9999)
# 获取文件大小(伪数据)
def GetFileSize(file80h_pattern):
if not file80h_pattern or not isinstance(file80h_pattern, list):
return 0
if file80h_pattern[0].get('is_resident'):
fragments = GetFragmentData(file80h_pattern)
if fragments and len(fragments) > 0:
return fragments[0].get('byte_length', 0)
else:
sequence_list = ExtractSequenceHexValues(file80h_pattern)
if len(sequence_list) < 64:
raise ValueError("序列长度不足,无法解析文件大小")
size_list = sequence_list[56:64]
size = hex_list_to_int(size_list)
return size
# ✅ 【伪代码】获取随机 Length
def GetRandomLength() -> int:
return random.randint(1000, 9999)
# 获取文件内容哈希(伪数据)
def GetFileHash(full_path: str) -> str:
return hashlib.sha256(full_path.encode()).hexdigest()
# ✅ 主函数:遍历 NewDBPath 插入 NewDBNode或自定义表名
def InsertNodeDataToDb(db_path='../src/filesystem.db', table_name='db_node'):
"""
遍历 NewDBPath 表,并生成对应的 Node 数据插入到指定表中。
# 新增:获取文件片段位置和长度
def GetFragmentLocation(fragment):
return fragment.get('starting_byte', 0)
参数:
db_path: str数据库路径
table_name: str目标表名
"""
def GetFragmentLength(fragment):
return fragment.get('byte_length', 0)
# 主函数:将 db_path 数据导入 db_node
def InsertNodeDataToDB(db_path='../src/db_ntfs_info.db', table_name='db_node', batch_size=20):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 动态创建表(如果不存在)
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
PathID INTEGER NOT NULL,
ExtendNameID INTEGER,
GroupID INTEGER,
UserID INTEGER,
FileSize INTEGER,
FileHash TEXT,
ExtentCount INTEGER,
extent1_DeviceID INTEGER,
extent1_Location INTEGER,
extent1_Length INTEGER,
extent2_DeviceID INTEGER,
extent2_Location INTEGER,
extent2_Length INTEGER,
extent3_DeviceID INTEGER,
extent3_Location INTEGER,
extent3_Length INTEGER,
extent4_DeviceID INTEGER,
extent4_Location INTEGER,
extent4_Length INTEGER,
if len(volume_letter) == 1:
volume_root = f"{volume_letter}:\\"
elif volume_letter.endswith(':'):
volume_root = f"{volume_letter}\\"
else:
volume_root = f"{volume_letter}:\\" # 支持 "Y" 或 "Y:" 输入
-- 外键约束
FOREIGN KEY(PathID) REFERENCES db_path(ID),
FOREIGN KEY(ExtendNameID) REFERENCES db_extend(ID),
FOREIGN KEY(GroupID) REFERENCES db_group(ID),
FOREIGN KEY(UserID) REFERENCES db_user(ID)
);
"""
cursor.execute(create_table_sql)
print(f"🔍 当前处理磁盘根目录:{volume_root}")
# 获取所有 NewDBPath 记录
cursor.execute("SELECT ID, Name, Path, IsDir FROM db_path")
path_records = cursor.fetchall()
group_id = GetFirstGroupId(cursor)
user_id = GetFirstUserId(cursor)
device_id = GetDeviceId(cursor)
batch = []
device_id = GetDeviceId(cursor)
cursor.execute("SELECT ID, Path, Name, ParentID FROM db_path")
rows = cursor.fetchall()
for path_id, name, full_path, is_dir in path_records:
if is_dir == 1:
extend_name_id = 0
insert_fields = [
'PathID', 'ParentID', 'NameHash', 'PathHash',
'ExtendNameID', 'DirLayer', 'GroupID', 'UserID',
'FileCreateTime', 'FileModifyTime', 'FileAccessTime', 'FileAuthTime',
'FileSize', 'FileMode', 'FileHash', 'ExtentCount',
# extent 字段
"extent1_DeviceID", "extent1_Location", "extent1_Length",
"extent2_DeviceID", "extent2_Location", "extent2_Length",
"extent3_DeviceID", "extent3_Location", "extent3_Length",
"extent4_DeviceID", "extent4_Location", "extent4_Length"
]
insert_placeholders = ', '.join('?' * len(insert_fields))
insert_sql = f"INSERT INTO {table_name} ({', '.join(insert_fields)}) VALUES ({insert_placeholders})"
batch = []
for row in rows:
path_id, relative_path, name, parent_id = row
full_path = os.path.join(volume_root, relative_path)
# 检查是否已存在相同 PathID
cursor.execute("SELECT COUNT(*) FROM db_node WHERE PathID = ?", (path_id,))
exists = cursor.fetchone()[0]
if exists > 0:
print(f"⚠️ PathID {path_id} 已存在,跳过插入")
continue
try:
file80h_pattern = GetFile80hPattern(full_path)
fragments = GetFragmentData(file80h_pattern)
extent_count = min(len(fragments), 4)
except Exception as e:
print(f"⚠️ 获取 ExtentCount 失败,使用默认值 0: {e}")
fragments = []
extent_count = 0
# 计算字段
name_hash = hashlib.sha256(name.encode()).hexdigest()
dir_layer = GetDirLayer(relative_path)
extend_name_id = GetExtendNameId(name, cursor)
try:
file_size = GetFileSize(file80h_pattern)
except Exception as e:
print(f"⚠️ 获取文件大小失败,使用默认值 0: {e}")
file_size = 0
file_hash = GetFileHash(full_path)
# 获取时间信息
file_times = GetFilesTime(full_path)
create_time = file_times["FileCreateTime"]
modify_time = file_times["FileModifyTime"]
access_time = file_times["FileAccessTime"]
auth_time = file_times["FileAuthTime"]
# 查询 PathHash
cursor.execute("SELECT PathHash FROM db_path WHERE ID = ?", (path_id,))
path_hash_result = cursor.fetchone()
path_hash = path_hash_result[0] if path_hash_result else ""
# 构建 extent 字段
extent_data = []
for i in range(4): # 最多4个 extent
if i < len(fragments):
frag = fragments[i]
location = GetFragmentLocation(frag)
length = GetFragmentLength(frag)
extent_data.extend([device_id, location, length])
else:
extend_name_id = GetExtendNameId(name, cursor)
extent_data.extend([None, None, None])
group_id = GetFirstGroupId(cursor)
user_id = GetFirstUserId(cursor)
# 构建插入数据
values = [
path_id, parent_id, name_hash, path_hash,
extend_name_id, dir_layer, group_id, user_id,
create_time, modify_time, access_time, auth_time,
file_size, 'default', file_hash, extent_count,
*extent_data
]
file_size = GetFileSize(full_path)
file_hash = GetFileHash(full_path)
extent_count = GetExtentCount(full_path)
# 构造 extent 数据(最多 4 个片段)
extent_data = []
for i in range(extent_count):
extent_data.append((device_id, GetRandomLocation(), GetRandomLength()))
# 填充到 4 个字段
while len(extent_data) < 4:
extent_data.append((0, 0, 0))
# 添加到批次插入数据
batch.append((
path_id,
extend_name_id,
group_id,
user_id,
file_size,
file_hash,
extent_count,
*extent_data[0],
*extent_data[1],
*extent_data[2],
*extent_data[3]
))
batch.append(values)
# 批量插入
insert_sql = f"""
INSERT OR IGNORE INTO {table_name} (
PathID, ExtendNameID, GroupID, UserID, FileSize, FileHash, ExtentCount,
extent1_DeviceID, extent1_Location, extent1_Length,
extent2_DeviceID, extent2_Location, extent2_Length,
extent3_DeviceID, extent3_Location, extent3_Length,
extent4_DeviceID, extent4_Location, extent4_Length
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
if len(batch) >= batch_size:
cursor.executemany(insert_sql, batch)
conn.commit()
print(f"✅ 提交一批 {len(batch)} 条记录到 {table_name}")
batch.clear()
# 插入剩余不足一批的数据
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
print(f"成功插入 {cursor.rowcount}{table_name} 记录")
print(f"提交最后一批 {len(batch)}记录到 {table_name}")
except Exception as e:
print(f"❌ 插入失败: {e}")
conn.rollback()
finally:
conn.close()
conn.close()
print(f"✅ 数据已成功插入到 {table_name}")
# 示例调用
if __name__ == "__main__":
InsertNodeDataToDb(db_path='../src/db_ntfs_info.db', table_name='db_node')
InsertNodeDataToDB()

View File

@@ -1,49 +1,6 @@
import hashlib
import os
import sqlite3
import time
def get_file_times(full_path):
"""
获取文件的创建、修改、访问时间,并格式化为字符串。
参数:
full_path: str文件路径
返回:
tuple: (create_time, modify_time, access_time, auth_time)
"""
try:
stat = os.stat(full_path)
# 转换为可读时间格式YYYY-MM-DD HH:MM:SS
def format_time(timestamp):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
create_time = format_time(stat.st_ctime)
modify_time = format_time(stat.st_mtime)
access_time = format_time(stat.st_atime)
auth_time = format_time(stat.st_ctime) # Windows 上用 ctime 表示权限变化时间(近似)
return create_time, modify_time, access_time, auth_time
except Exception as e:
print(f"⚠️ 获取时间失败: {e}")
return "unknown", "unknown", "unknown", "unknown"
def get_file_mode(full_path):
"""
获取文件权限模式Windows 下模拟)。
可以根据只读、隐藏等属性扩展
"""
try:
stat = os.stat(full_path)
# 在 Windows 下模拟权限为 'default',也可用 bit mask 解析
return "default"
except Exception as e:
return "unknown"
def GenerateHash(s: str) -> str:
@@ -66,54 +23,26 @@ def ShouldSkipPath(path: str) -> bool:
return False
def GetDirLayer(full_path: str, volume_letter: str) -> int:
"""
根据路径计算目录层级。
示例:
Z:\demo.txt → 0
Z:\folder\test.txt → 1
Z:\folder\subfolder\file.txt → 2
参数:
full_path: str完整路径
volume_letter: str磁盘盘符'Z'
返回:
int层级数
"""
root_prefix = f"{volume_letter.upper()}:\\"
if not full_path.startswith(root_prefix):
return -1 # 非法路径
relative_path = full_path[len(root_prefix):]
if not relative_path:
return 0 # 根目录层级为 0
return len(relative_path.split(os.sep)) - 1
def ScanVolume(volume_letter: str):
def ScanVolume(volume_letter: str) -> list:
"""
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
并为每个节点分配 ParentID 和 DirLayer
返回:
list of dict包含文件/目录信息的字典列表
并为每个节点分配 ParentID。
"""
root_path = f"{volume_letter.upper()}:\\"
if not os.path.exists(root_path):
raise ValueError(f"磁盘 {root_path} 不存在")
result = []
path_to_id = {} # 用于记录路径到数据库 ID 的映射
counter = 1 # 模拟数据库自增 ID
path_to_id = {} # 路径 -> ID 映射
counter = 1
for root, dirs, files in os.walk(root_path, topdown=True, onerror=None, followlinks=False):
# 过滤掉需要跳过的目录
dirs[:] = [d for d in dirs if not ShouldSkipPath(os.path.join(root, d))]
for entry in files + dirs:
entries = files + dirs
for entry in entries:
full_path = os.path.join(root, entry)
if ShouldSkipPath(full_path):
@@ -130,62 +59,61 @@ def ScanVolume(volume_letter: str):
continue
name = entry
path_hash = GenerateHash(full_path)
# 计算 ContentSizeKB小文件至少显示为 1 KB
# 分离盘符并处理路径格式
_, relative_path = os.path.splitdrive(full_path)
relative_path = relative_path.lstrip("\\").rstrip("\\")
if os.path.isdir(full_path) and not relative_path.endswith("/"):
relative_path += "/"
relative_path = relative_path.replace("\\", "/")
path_hash = GenerateHash(relative_path)
content_size = bytes_size // 1024
if content_size == 0 and bytes_size > 0:
content_size = 1
parent_path = os.path.dirname(full_path)
parent_id = path_to_id.get(parent_path, 0)
dir_layer = GetDirLayer(full_path, volume_letter)
_, parent_relative_path = os.path.splitdrive(parent_path)
parent_relative_path = parent_relative_path.lstrip("\\").rstrip("\\")
if os.path.isdir(parent_path) and not parent_relative_path.endswith("/"):
parent_relative_path += "/"
parent_relative_path = parent_relative_path.replace("\\", "/")
# ✅ 获取文件时间属性
ctime, mtime, atime, chgtime = get_file_times(full_path)
mode = get_file_mode(full_path)
parent_id = path_to_id.get(parent_relative_path, 0)
item = {
"ID": counter,
"Path": full_path,
"Path": relative_path,
"Name": name,
"PathHash": path_hash,
"IsDir": is_dir,
"ParentID": parent_id,
"ContentSize": content_size,
"DirLayer": dir_layer,
"FileCreateTime": ctime,
"FileModifyTime": mtime,
"FileAccessTime": atime,
"FileAuthTime": chgtime,
"FileMode": mode
"ContentSize": content_size
}
result.append(item)
path_to_id[full_path] = counter
yield item # 使用 yield 返回每条记录
path_to_id[relative_path] = counter
counter += 1
except Exception as e:
print(f"⚠️ 跳过路径 {full_path},错误: {e}")
return result
def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
def InsertPathDataToDB(data_generator, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
"""
批量将扫描结果写入 NewDBPath 表中,支持新字段
流式写入数据库,边扫描边入库
参数:
data: list of dict扫描结果数据
db_path: strSQLite 数据库路径
table_name: str目标表名
batch_size: int每多少条提交一次
:param data_generator: 可迭代对象(如生成器)
:param db_path: 数据库路径
:param table_name: 表名
:param batch_size: 每多少条记录提交一次
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 创建表(如果不存在)
create_table_sql = f"""
CREATE TABLE IF NOT EXISTS {table_name} (
ID INTEGER PRIMARY KEY AUTOINCREMENT,
@@ -195,80 +123,60 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
ParentID INTEGER,
ContentSize INTEGER,
DirLayer INTEGER NOT NULL,
FileCreateTime TEXT,
FileModifyTime TEXT,
FileAccessTime TEXT,
FileAuthTime TEXT,
FileMode TEXT,
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
);
"""
cursor.execute(create_table_sql)
# 插入语句(忽略重复 PathHash
insert_sql = f"""
INSERT OR IGNORE INTO {table_name}
(Path, Name, PathHash, IsDir, ParentID, ContentSize, DirLayer, FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime, FileMode)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
(Path, Name, PathHash, IsDir, ParentID, ContentSize)
VALUES (?, ?, ?, ?, ?, ?)
"""
total_inserted = 0
batch = []
for item in data:
for item in data_generator:
batch.append((
item['Path'],
item['Name'],
item['PathHash'],
item['IsDir'],
item['ParentID'] or 0,
item['ContentSize'],
item['DirLayer'],
item['FileCreateTime'],
item['FileModifyTime'],
item['FileAccessTime'],
item['FileAuthTime'],
item['FileMode']
item['ContentSize']
))
if len(batch) >= batch_size:
cursor.executemany(insert_sql, batch)
conn.commit()
total_inserted += cursor.rowcount
print(f"✅ 提交一批 {len(batch)} 条数据")
batch.clear()
# 插入剩余数据
# 提交剩余不足一批的数据
if batch:
cursor.executemany(insert_sql, batch)
conn.commit()
total_inserted += cursor.rowcount
print(f"✅ 提交最后一批 {len(batch)} 条数据")
print(f"✅ 总共插入 {total_inserted} 条记录到 NewDBPath 表。")
except Exception as e:
print(f"❌ 插入失败: {e}")
conn.rollback()
finally:
conn.close()
# 示例主函数
def main():
volume_letter = "Z"
def DBPathMain(volume_letter: str):
print(f"🔍 开始全盘扫描磁盘 {volume_letter}:\\ ...")
scanned_data = ScanVolume(volume_letter)
print(f"📊 共扫描到 {len(scanned_data)} 条有效记录,开始入库...")
InsertPathDataToDB(scanned_data)
# 获取生成器对象
generator = ScanVolume(volume_letter)
print("✅ 全盘扫描与 NewDBPath 表入库完成")
print(f"📊 开始逐批入库...")
InsertPathDataToDB(generator)
print("✅ 全盘扫描与入库完成")
if __name__ == "__main__":
main()
DBPathMain(volume_letter="Y")

View File

@@ -1,14 +1,15 @@
from db_config import GetNTFSBootInfo, InsertInfoToDBConfig
from db_device import ScanSpecialVolumes, InsertVolumesToDB
from db_extend_name import InsertExtensionsToDB
from db_group import InsertGroupToDB
from db_path import GenerateHash, ShouldSkipPath, ScanVolume, InsertPathDataToDB
from db_user import InsertUserToDB
from ntfs_utils.db_config import GetNTFSBootInfo, InsertInfoToDBConfig
from ntfs_utils.db_device import ScanSpecialVolumes, InsertVolumesToDB
from ntfs_utils.db_extend_name import InsertExtensionsToDB
from ntfs_utils.db_group import InsertGroupToDB
# from ntfs_utils.db_node import InsertNodeDataToDB
from ntfs_utils.db_path import DBPathMain
from ntfs_utils.db_user import InsertUserToDB
volume_letter = 'Y'
def main():
volume_letter = 'Z'
# 初始化 db_config 表
config_data = GetNTFSBootInfo(volume_letter)
InsertInfoToDBConfig(config_data)
@@ -25,10 +26,6 @@ def main():
group_name_list = ["Copier"]
InsertGroupToDB(group_name_list)
# 初始化 db_path 表
scanned_data = ScanVolume(volume_letter)
InsertPathDataToDB(scanned_data)
# 初始化 db_extend_name 表
common_extensions = [
"txt", "log", "csv", "xls", "xlsx", "doc", "docx",
@@ -40,6 +37,12 @@ def main():
count = InsertExtensionsToDB(common_extensions)
print(f"共插入 {count} 个新扩展名。")
# 初始化 db_path 表
DBPathMain(volume_letter=volume_letter)
# 初始化 db_node 表
# InsertNodeDataToDB(volume_letter)
if __name__ == '__main__':
main()

View File

@@ -1,9 +1,8 @@
import os
from typing import Any
import pytsk3
from db_config import GetNTFSBootInfo
from ntfs_utils.db_config import GetNTFSBootInfo
def find_file_mft_entry(fs, target_path):
@@ -61,7 +60,7 @@ def GetFileMftEntry(file_path):
drive_letter = os.path.splitdrive(file_path)[0][0]
device = f"\\\\.\\{drive_letter}:"
print(f"Opening device: {device}")
# print(f"Opening device: {device}")
try:
img = pytsk3.Img_Info(device)
@@ -74,9 +73,10 @@ def GetFileMftEntry(file_path):
root_path = f"{drive_letter}:\\"
rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\")
print(f"Looking up MFT entry for: {rel_path}")
# print(f"Looking up MFT entry for: {rel_path}")
mft_entry = find_file_mft_entry(fs, rel_path)
# print(f"MFT Entry: {mft_entry}")
if mft_entry is None:
raise RuntimeError("Could not find MFT entry for the specified file.")
@@ -103,7 +103,9 @@ def CalculateFileMftStartSector(mft_entry, volume_letter="Z"):
config_data = GetNTFSBootInfo(volume_letter)
# 计算文件 MFT Entry 的起始扇区号
start_sector = config_data["MftPosition"] * 8 + mft_entry * 2
if start_sector < 0:
raise ValueError("起始扇区号不能为负数")
# print(f"文件 MFT Entry 的起始扇区号: {start_sector}")
return start_sector
@@ -215,10 +217,192 @@ def GetFile80hPattern(file_path):
try:
mft_entry_value = GetFileMftEntry(file_path)
StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter)
print(Get80hPattern(StartSector, volume_letter))
# print(f"文件的相关信息以及80属性内容")
# print(Get80hPattern(StartSector, volume_letter))
file80h_pattern = Get80hPattern(StartSector, volume_letter)
return file80h_pattern
except Exception as e:
print(f"❌ Error: {e}")
return None
if __name__ == '__main__':
GetFile80hPattern(r"Z:\demo.jpg")
# if __name__ == '__main__':
# data = GetFile80hPattern(r"Z:\hello.txt")
# print(data)
def ExtractSequenceHexValues(file80h_pattern):
"""
从给定的数据结构中提取所有 sequence 的十六进制字符串,并合并成一个标准列表
参数:
data (list): 包含字典的列表,每个字典有 'sequence'
返回:
list: 包含所有 sequence 值的合并列表
"""
sequence_list = []
for entry in file80h_pattern:
if 'sequence' in entry:
# 将每个十六进制字符串按空格分割,然后合并到结果列表
for hex_str in entry['sequence']:
# 分割字符串并添加到结果
sequence_list.extend(hex_str.split())
return sequence_list
def ExportDataRunList(data_run_list):
"""
将 data_run_list 拆分成多个独立的 Data Run 片段。
"""
result = []
pos = 0
while pos < len(data_run_list):
current_byte = data_run_list[pos]
if current_byte == '00':
break
try:
header = int(current_byte, 16)
len_bytes = (header >> 4) & 0x0F
offset_bytes = header & 0x0F
run_length = 1 + offset_bytes + len_bytes
if pos + run_length > len(data_run_list):
print(f"⚠️ 数据越界,停止解析")
break
fragment = data_run_list[pos: pos + run_length]
result.append(fragment)
pos += run_length
except Exception as e:
print(f"❌ 解析 Data Run 失败:位置 {pos}, 错误: {e}")
pos += 1 # 跳过一个字节继续解析
return result
def hex_list_to_int(lst, byteorder='little'):
"""
将十六进制字符串列表转换为整数(支持小端序)
"""
if byteorder == 'little':
lst = list(reversed(lst))
return int(''.join(f"{int(b, 16):02x}" for b in lst), 16)
def parse_data_run(data_run, previous_cluster=0, cluster_size=512):
"""
解析 NTFS 单个 Data Run返回起始字节、结束字节、长度字节
参数:
data_run (list): Data Run 的十六进制字符串列表
previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移)
cluster_size (int): 簇大小(默认为 512 字节)
返回:
dict: 包含起始字节、结束字节、长度等信息
"""
if not data_run or data_run[0] == '00':
return None
header = int(data_run[0], 16)
len_bytes = (header >> 4) & 0x0F
offset_bytes = header & 0x0F
if len(data_run) < 1 + offset_bytes + len_bytes:
print(f"⚠️ 数据长度不足,无法解析 Data Run")
return None
# 提取偏移字段和长度字段
offset_data = data_run[1:1 + offset_bytes]
length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes]
# 小端序转整数
def hex_list_to_int(lst):
return int(''.join(f"{int(b, 16):02x}" for b in reversed(lst)), 16)
offset = hex_list_to_int(offset_data)
run_length = hex_list_to_int(length_data)
# 计算起始簇号
starting_cluster = previous_cluster + offset
ending_cluster = starting_cluster + run_length - 1
# 转换为字节偏移
cluster_per_sector = 8
byte_per_sector = cluster_size
byte_length = starting_cluster * cluster_per_sector * byte_per_sector
starting_byte = run_length * cluster_per_sector * byte_per_sector
ending_byte = starting_byte + byte_length - 1
return {
"starting_byte": starting_byte,
"ending_byte": ending_byte,
"byte_length": byte_length,
"starting_cluster": starting_cluster,
"run_length_clusters": run_length
}
def ParseMultipleDataRuns(fragments, cluster_size=512):
"""
批量解析多个 Data Run 片段,返回字节偏移信息。
参数:
fragments (list): 多个 Data Run 字符串列表
cluster_size (int): 簇大小(默认为 512
返回:
list: 每个元素是一个包含字节偏移信息的 dict
"""
results = []
previous_starting_cluster = 0
for fragment in fragments:
result = parse_data_run(fragment, previous_starting_cluster, cluster_size)
if result:
results.append(result)
previous_starting_cluster = result["starting_cluster"]
return results
def GetFragmentData(file80h_pattern):
if not file80h_pattern or not isinstance(file80h_pattern, list):
return []
if file80h_pattern[0].get('is_resident'):
start_byte = file80h_pattern[0].get('start_byte')
offset = file80h_pattern[0].get('offset')
content_start = file80h_pattern[0].get('sequence')[2]
content_start_list = content_start.split()
content_len = content_start_list[::-1][4:8]
content_offset = content_start_list[::-1][:4]
content_len_str = ''.join(content_len)
content_len_decimal_value = int(content_len_str, 16)
content_offset_str = ''.join(content_offset)
content_offset_decimal_value = int(content_offset_str, 16)
file_offset = start_byte + offset + content_offset_decimal_value
return [{
'starting_byte': file_offset,
'byte_length': content_len_decimal_value
}]
else:
sequence_list = ExtractSequenceHexValues(file80h_pattern)
data_run_offset = sequence_list[32:34][::-1]
data_run_offset_str = ''.join(data_run_offset)
data_run_offset_decimal_value = int(data_run_offset_str, 16)
data_run_list = sequence_list[data_run_offset_decimal_value:]
fragments = ExportDataRunList(data_run_list)
results = ParseMultipleDataRuns(fragments)
return results
# if __name__ == '__main__':
# arri80_data = GetFile80hPattern(r"Z:\hello.txt")
# data = GetFragmentData(arri80_data)
# print(data)

View File

@@ -6,5 +6,4 @@ readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"psutil>=7.0.0",
"pytsk3>=20250312",
]

View File

@@ -0,0 +1,139 @@
def extract_data_run_fragments(data_run):
"""
将 data_run 中的多个 Data Run 提取为独立的 list 片段。
参数:
data_run (list): 十六进制字符串组成的列表,表示 Data Run 内容
返回:
list: 每个元素是一个代表单个 Data Run 的 list
"""
result = []
pos = 0
while pos < len(data_run):
current_byte = data_run[pos]
if current_byte == '00':
# 遇到空运行块,停止解析
break
try:
header = int(current_byte, 16)
len_bytes = (header >> 4) & 0x0F
offset_bytes = header & 0x0F
if len_bytes == 0 or offset_bytes == 0:
print(f"⚠️ 无效的字段长度,跳过位置 {pos}")
break
# 计算当前 Data Run 总长度
run_length = 1 + offset_bytes + len_bytes
# 截取当前 Data Run
fragment = data_run[pos: pos + run_length]
result.append(fragment)
# 移动指针
pos += run_length
except Exception as e:
print(f"❌ 解析失败,位置 {pos}{e}")
break
return result
def hex_list_to_int(lst, byteorder='little'):
"""
将十六进制字符串列表转换为整数(支持小端序)
"""
if byteorder == 'little':
lst = list(reversed(lst))
return int(''.join(f"{int(b, 16):02x}" for b in lst), 16)
def parse_data_run(data_run, previous_cluster=0):
"""
解析 NTFS 单个 Data Run返回起始簇号和结束簇号
参数:
data_run (list): Data Run 的十六进制字符串列表
previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移)
返回:
dict: 包含起始簇、结束簇、运行长度等信息
"""
if not data_run or data_run[0] == '00':
return None
header = int(data_run[0], 16)
len_bytes = (header >> 4) & 0x0F
offset_bytes = header & 0x0F
# 提取偏移字段和长度字段(注意顺序是先偏移后长度)
offset_data = data_run[1:1 + offset_bytes]
length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes]
# 解析偏移和长度
offset = hex_list_to_int(offset_data, 'little')
run_length = hex_list_to_int(length_data, 'little')
# 计算起始簇号(如果是第一个就是绝对偏移,否则是相对偏移)
starting_cluster = previous_cluster + offset
ending_cluster = starting_cluster + run_length - 1
return {
"starting_cluster": starting_cluster,
"ending_cluster": ending_cluster,
"run_length": run_length
}
def parse_multiple_data_runs(fragments):
"""
批量解析多个 Data Run 片段,支持相对偏移。
参数:
fragments (list): 多个 Data Run 字符串列表,如:
[
['31', '7a', '00', 'ee', '0b'],
['22', '29', '06', 'bb', '00'],
...
]
返回:
list: 每个元素是一个 dict包含该片段的解析结果
"""
results = []
previous_starting_cluster = 0
for fragment in fragments:
result = parse_data_run(fragment, previous_starting_cluster)
if result:
results.append(result)
previous_starting_cluster = result["starting_cluster"]
return results
data_run = [
'31', '7a', '00', 'ee', '0b',
'22', '29', '06', 'bb', '00',
'32', '7a', '02', 'ee', '00', '00',
'00', 'a0', 'f8', 'ff', 'ff', 'ff', 'ff', 'ff'
]
# Step 1: 提取所有有效片段
fragments = extract_data_run_fragments(data_run)
print("提取到的片段:")
for i, frag in enumerate(fragments):
print(f"片段{i + 1}: {frag}")
# Step 2: 批量解析这些片段
results = parse_multiple_data_runs(fragments)
print("\n解析结果:")
for i, res in enumerate(results):
print(f"片段{i + 1}: {res}")

36
test/fake_main.py Normal file
View File

@@ -0,0 +1,36 @@
from files_save import CopyMultiFragmentFiles, CopySingleFragmentFiles
target_path = r"Z:\Recovered"
# 存储各个文件的分片内容列表
fragment_lists = {}
test_file_sort = [{'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'filename': 'AGA - 一.mp3', 'extent_count': 2,
'start_byte': 23162880, 'length': 69632, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3',
'filename': 'Aaron Zigman - Main Title.mp3', 'extent_count': 1, 'start_byte': 687685632,
'length': 7163904, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\AGA - MIZU.mp3', 'filename': 'AGA - MIZU.mp3', 'extent_count': 1,
'start_byte': 694849536, 'length': 8126464, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'filename': 'AGA - 一.mp3', 'extent_count': 2,
'start_byte': 702976000, 'length': 10870784, 'fragment_index': 2},
{'absolute_path': 'Y:\\CloudMusic\\Anson Seabra - Keep Your Head Up Princess.mp3',
'filename': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'extent_count': 1,
'start_byte': 713846784, 'length': 7970816, 'fragment_index': 1}, {
'absolute_path': 'Y:\\CloudMusic\\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
'filename': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
'extent_count': 1, 'start_byte': 721817600, 'length': 9179136, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\Ava Max - Sweet but Psycho.mp3',
'filename': 'Ava Max - Sweet but Psycho.mp3', 'extent_count': 1, 'start_byte': 731000832,
'length': 7938048, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3',
'filename': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'extent_count': 1,
'start_byte': 738938880, 'length': 6791168, 'fragment_index': 1},
{'absolute_path': 'Y:\\CloudMusic\\Color Music Choir - Something Just Like This (Live).mp3',
'filename': 'Color Music Choir - Something Just Like This (Live).mp3', 'extent_count': 1,
'start_byte': 745730048, 'length': 6193152, 'fragment_index': 1}]
for item in test_file_sort:
extent_count = item['extent_count']
if extent_count == 1:
CopySingleFragmentFiles(item, target_path)
elif extent_count > 1:
CopyMultiFragmentFiles(item, fragment_lists, target_path)

160
test/files_save.py Normal file
View File

@@ -0,0 +1,160 @@
import os
def ExtractVolumeLetter(path: str) -> str:
"""从绝对路径中提取盘符"""
drive = os.path.splitdrive(path)[0]
if not drive:
raise ValueError(f"无法从路径中提取盘符:{path}")
return drive[0].upper() # 返回 'Y'
def CopySingleFragmentFiles(source_data_dict, target_path):
"""
根据起始字节和长度,从磁盘中读取数据并保存为目标文件
:param source_data_dict: 包含源数据信息的字典
:param target_path: 目标文件夹路径
"""
start_byte = source_data_dict.get("start_byte")
byte_length = source_data_dict.get("length")
absolute_path = source_data_dict.get("absolute_path")
file_name = source_data_dict.get("filename")
if byte_length <= 0:
print("错误:字节长度无效")
return
if not absolute_path or not file_name:
print("错误:缺少必要的文件信息")
return
source_disk_path = ExtractVolumeLetter(absolute_path)
target_file_path = os.path.join(target_path, file_name)
try:
# 创建目标目录(如果不存在)
os.makedirs(target_path, exist_ok=True)
with open(fr"\\.\{source_disk_path}:", 'rb') as disk:
disk.seek(start_byte)
with open(target_file_path, 'wb') as f:
remaining = byte_length
CHUNK_SIZE = 1024 * 1024 # 1MB
while remaining > 0:
read_size = min(CHUNK_SIZE, remaining)
chunk = disk.read(read_size)
if not chunk:
print("警告:读取到空数据,可能已到达磁盘末尾。")
break
f.write(chunk)
remaining -= len(chunk)
print(
f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}")
except PermissionError:
print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序")
except Exception as e:
print(f"发生错误: {str(e)}")
def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes:
"""
从指定磁盘的指定起始位置读取指定长度的字节。
:param volume_letter: 盘符(如 "Y"
:param start_byte: 起始字节位置(整数)
:param length: 要读取的字节数(整数)
:return: 读取到的原始字节数据bytes
"""
if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1:
raise ValueError("drive_letter 必须是单个字母,如 'Y'")
# 构建 Windows 设备路径格式:\\.\Y:
disk_path = f"\\\\.\\{volume_letter.strip().upper()}:"
try:
with open(disk_path, "rb") as disk:
disk.seek(start_byte)
data = disk.read(length)
return data
except PermissionError:
raise PermissionError("权限不足,请以管理员身份运行程序")
except Exception as e:
raise RuntimeError(f"读取磁盘失败:{e}")
# if __name__ == "__main__":
# drive = "Y"
# start = 687685632
# size = 7163904
#
# try:
# content = ReadDiskBytes(drive, start, size)
# print(f"成功读取 {len(content)} 字节内容。前100字节为")
# print(content[:100])
# except Exception as e:
# print("错误:", e)
def CopyMultiFragmentFiles(
item: dict,
fragment_lists: dict,
target_path: str
):
"""
处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。
:param item: 包含文件分片信息的字典
:param fragment_lists: 存储各文件分片内容的字典
:param target_path: 恢复文件的目标保存路径
:return: None
"""
file_name = item['filename']
extent_count = item['extent_count']
fragment_index = item['fragment_index']
start_byte = item['start_byte']
length_byte = item['length']
volume_letter = ExtractVolumeLetter(item['absolute_path'])
# 读取分片内容
fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte)
# 如果还没有为这个文件创建列表,则初始化
if file_name not in fragment_lists:
fragment_lists[file_name] = [None] * extent_count
# 将内容插入到指定位置
if fragment_index <= extent_count:
fragment_lists[file_name][fragment_index - 1] = fragment_content
print(f"已写入 {file_name} 的第 {fragment_index} 个片段。")
else:
print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}")
# 检查是否所有分片都已加载
fragments = fragment_lists[file_name]
if None not in fragments:
full_content = b''.join(fragments)
target_file_path = os.path.join(target_path, file_name)
try:
with open(target_file_path, 'wb') as f:
f.write(full_content)
print(f"已成功恢复文件:{file_name}")
except Exception as e:
print(f"写入文件失败:{file_name},错误:{e}")
if __name__ == "__main__":
test_dict = {
'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3',
'filename': 'Aaron Zigman - Main Title.mp3',
'extent_count': 1,
'start_byte': 687685632,
'length': 7163904,
'fragment_index': 1
}
CopySingleFragmentFiles(test_dict, target_path=r"Z:\RecoveredFiles")

232
test/files_sort.py Normal file
View File

@@ -0,0 +1,232 @@
import sqlite3
def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path",
files_path=None) -> list:
"""
根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。
:param db_path: 数据库文件路径
:param table_name: 要查询的数据表名称
:param files_path: 文件的完整路径列表
:return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str}
"""
if files_path is None:
file_paths = []
results = []
# 连接数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
for path in files_path:
try:
# 使用字符串格式化插入表名,参数化查询只适用于值
sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?"
cursor.execute(sql, (path,))
row = cursor.fetchone()
if row:
results.append({
'absolute_path': path,
'id': row[0],
'name': row[1]
})
else:
print(f"未找到匹配记录:{path}")
except Exception as e:
print(f"查询失败:{path},错误:{e}")
conn.close()
return results
# if __name__ == "__main__":
# test_files = [
# r"CloudMusic/AGA - MIZU.mp3",
# r"CloudMusic/AGA - 一.mp3",
# r"CloudMusic/Aaron Zigman - Main Title.mp3",
# r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
# r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
# r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
# r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
# r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
# ]
#
# result = GetFilesDBPathInfo(files_path=test_files)
# for item in result:
# print(item)
def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node",
path_records: list = None) -> list:
"""
根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。
:param db_path: 数据库文件路径
:param table_name: db_node 表名
:param path_records: 来自 get_db_path_info 的结果列表
:return: 包含文件分片信息的结果列表
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
results = []
for record in path_records:
path_id = record['id']
absolute_path = record['absolute_path']
name = record['name']
try:
# 查询 db_node 表中 PathID 对应的记录
cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,))
row = cursor.fetchone()
if not row:
print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录")
continue
# 获取字段索引(适用于按列名获取)
columns = [desc[0] for desc in cursor.description]
# 构建字典以便按列名访问
node_data = dict(zip(columns, row))
# 获取 ExtentCount
extent_count = node_data.get("ExtentCount", 0)
# 解析分片信息
fragments = []
for i in range(1, 5): # extent1 ~ extent4
loc = node_data.get(f"extent{i}_Location")
length = node_data.get(f"extent{i}_Length")
if loc is not None and length is not None and length > 0:
fragments.append({
"start_byte": loc,
"length": length
})
results.append({
"absolute_path": absolute_path,
"name": name,
"path_id": path_id,
"extent_count": extent_count,
"fragments": fragments
})
except Exception as e:
print(f"查询失败PathID={path_id}, 错误:{e}")
conn.close()
return results
if __name__ == "__main__":
test_files = [
r"CloudMusic/AGA - MIZU.mp3",
r"CloudMusic/AGA - 一.mp3",
r"CloudMusic/Aaron Zigman - Main Title.mp3",
r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
]
# 第一步:获取 db_path 表中的 ID 和 Name
path_info = GetFilesDBPathInfo(files_path=test_files)
# 第二步:根据 PathID 查询 db_node 表中的分片信息
file_extents_info = GetFilesDBNodeInfo(path_records=path_info)
# 打印结果
for item in file_extents_info:
print(item)
def sort_fragments_by_start_byte(file_extents_list: list) -> list:
"""
对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。
:param file_extents_list: get_file_extents_info 返回的结果列表
:return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息
"""
all_fragments = []
for file_info in file_extents_list:
absolute_path = file_info['absolute_path']
filename = file_info['name']
extent_count = file_info['extent_count']
fragments = file_info['fragments']
# 对当前文件的片段排序(虽然通常已经是有序的)
sorted_fragments = sorted(fragments, key=lambda x: x['start_byte'])
# 添加片段索引信息
for idx, fragment in enumerate(sorted_fragments, start=1):
all_fragments.append({
'absolute_path': absolute_path,
'filename': filename,
'extent_count': extent_count,
'start_byte': fragment['start_byte'],
'length': fragment['length'],
'fragment_index': idx
})
# 全局排序:按 start_byte 排序所有片段
all_fragments.sort(key=lambda x: x['start_byte'])
return all_fragments
# if __name__ == "__main__":
# test_files = [
# r"CloudMusic/AGA - MIZU.mp3",
# r"CloudMusic/AGA - 一.mp3",
# r"CloudMusic/Aaron Zigman - Main Title.mp3",
# r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
# r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
# r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
# r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
# r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
# ]
# test_files_sort = [
# {'absolute_path': 'CloudMusic/AGA - MIZU.mp3', 'name': 'AGA - MIZU.mp3', 'path_id': 6, 'extent_count': 1,
# 'fragments': [{'start_byte': 694849536, 'length': 8126464}]},
# {'absolute_path': 'CloudMusic/AGA - 一.mp3', 'name': 'AGA - 一.mp3', 'path_id': 7, 'extent_count': 2,
# 'fragments': [{'start_byte': 702976000, 'length': 10870784}, {'start_byte': 23162880, 'length': 69632}]},
# {'absolute_path': 'CloudMusic/Aaron Zigman - Main Title.mp3', 'name': 'Aaron Zigman - Main Title.mp3',
# 'path_id': 5, 'extent_count': 1, 'fragments': [{'start_byte': 687685632, 'length': 7163904}]},
# {'absolute_path': 'CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3',
# 'name': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'path_id': 8, 'extent_count': 1,
# 'fragments': [{'start_byte': 713846784, 'length': 7970816}]},
# {'absolute_path': 'CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
# 'name': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', 'path_id': 9,
# 'extent_count': 1, 'fragments': [{'start_byte': 721817600, 'length': 9179136}]},
# {'absolute_path': 'CloudMusic/Ava Max - Sweet but Psycho.mp3', 'name': 'Ava Max - Sweet but Psycho.mp3',
# 'path_id': 10, 'extent_count': 1, 'fragments': [{'start_byte': 731000832, 'length': 7938048}]},
# {'absolute_path': 'CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3',
# 'name': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'path_id': 11, 'extent_count': 1,
# 'fragments': [{'start_byte': 738938880, 'length': 6791168}]},
# {'absolute_path': 'CloudMusic/Color Music Choir - Something Just Like This (Live).mp3',
# 'name': 'Color Music Choir - Something Just Like This (Live).mp3', 'path_id': 12, 'extent_count': 1,
# 'fragments': [{'start_byte': 745730048, 'length': 6193152}]}]
#
# path_info = GetFilesDBPathInfo(files_path=test_files)
# file_extents_data = GetFilesDBNodeInfo(path_records=path_info)
#
# # 根据文件片段先后排序
# single_fragment_result = sort_fragments_by_start_byte(file_extents_data)
#
# # 模拟多文件片段,根据文件片段先后排序
# multi_fragment_result = sort_fragments_by_start_byte(test_files_sort)
#
# print("单文件片段排序结果:")
# for item in single_fragment_result:
# print(item)
#
# print("\n多文件片段排序结果")
# for item in multi_fragment_result:
# print(item)

199
test/folders_sort.py Normal file
View File

@@ -0,0 +1,199 @@
import os
import sqlite3
from files_sort import GetFilesDBNodeInfo, sort_fragments_by_start_byte
def GetFolderID(
folder_path: str,
db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path"
) -> int | None:
"""
根据文件夹路径,查询数据库中该文件夹对应的 ID。
:param folder_path: 文件夹路径(如 r"CloudMusic\\"
:param db_path: 数据库文件路径
:param table_name: 要查询的数据表名称,默认为 'db_path'
:return: 成功则返回 IDint失败返回 None
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
# 使用 table_name 构建 SQL 查询
sql = f"SELECT ID FROM {table_name} WHERE Path = ?"
cursor.execute(sql, (folder_path,))
result = cursor.fetchone()
if result:
return result[0]
else:
print(f"未找到路径:{folder_path} 在表 {table_name}")
return None
except sqlite3.Error as e:
print(f"数据库操作失败:{e}")
return None
finally:
conn.close()
def GetSubPathsByParentID(
parent_id: int,
db_path: str = "../src/db_ntfs_info.db",
table_name: str = "db_path"
) -> list:
"""
根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。
:param parent_id: 父节点 ID
:param db_path: 数据库文件路径
:param table_name: 数据表名称
:return: 包含 ID、Path、Name 的字典列表
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
sql = f"""
SELECT ID, Path, Name
FROM {table_name}
WHERE ParentID = ?
"""
try:
cursor.execute(sql, (parent_id,))
rows = cursor.fetchall()
except Exception as e:
print(f"数据库查询失败:{e}")
return []
results = []
for row in rows:
item = {
'id': row[0],
'absolute_path': row[1],
'name': row[2]
}
results.append(item)
conn.close()
return results
if __name__ == "__main__":
test_folder_path = "pictures/"
parent_id_test = GetFolderID(test_folder_path)
# node_data = GetNodeFragmentsByParentID(parent_id_test)
path_data = GetSubPathsByParentID(parent_id_test)
node_data = GetFilesDBNodeInfo(path_records=path_data)
for data in node_data:
print(data)
def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list:
"""
根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。
:param db_path: 要查询的数据库
:param folder_path: 文件夹的绝对路径
:return list: 文件夹下所有文件按片段顺序排列的列表
"""
parent_id = GetFolderID(folder_path=folder_path, db_path=db_path)
path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path)
node_data = GetFilesDBNodeInfo(path_records=path_data)
result = sort_fragments_by_start_byte(node_data)
return result
# if __name__ == "__main__":
# folder_path_test = "pictures/"
# data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test)
# for item in data:
# print(item)
def ScanDirectory(root_dir, skip_system=True):
"""
递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。
:param root_dir: 要扫描的根目录路径
:param skip_system: 是否跳过系统目录(默认 True
:return: 文件路径列表,格式为 relative/path/to/file.ext
"""
file_list = []
for root, dirs, files in os.walk(root_dir):
# 跳过系统目录
if skip_system:
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
for file in files:
full_path = os.path.join(root, file)
# 去掉盘符
_, relative_path = os.path.splitdrive(full_path)
# 替换 \ 为 /
relative_path = relative_path.lstrip("\\").replace("\\", "/")
file_list.append(relative_path)
return file_list
# if __name__ == "__main__":
# folder_path = r"Y:/folder1/"
# files_list = ScanDirectory(folder_path)
#
# print(f"共找到 {len(files_list)} 个文件:")
# for f in files_list:
# print(f)
def ScanMultiFolders(folder_paths, skip_system=True):
"""
扫描多个根目录,返回所有子目录中的文件路径列表。
:param folder_paths: 包含多个根目录的列表
:param skip_system: 是否跳过系统目录(默认 True
:return: 所有文件的相对路径列表(格式为 folder/file.ext
"""
all_files = []
for root_dir in folder_paths:
# 确保路径存在
if not os.path.exists(root_dir):
print(f"⚠️ 路径不存在:{root_dir}")
continue
for root, dirs, files in os.walk(root_dir):
# 跳过系统目录
if skip_system:
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
for file in files:
full_path = os.path.join(root, file)
# 去掉盘符
_, relative_path = os.path.splitdrive(full_path)
relative_path = relative_path.lstrip("\\").replace("\\", "/")
all_files.append(relative_path)
return all_files
if __name__ == "__main__":
folders = [
r"Y:\CloudMusic",
r"Y:\folder1"
]
files = ScanMultiFolders(folders)
print(f"共找到 {len(files)} 个文件:")
for f in files:
print(f)

92
test/get_extent_counts.py Normal file
View File

@@ -0,0 +1,92 @@
def analyze_ntfs_data_attribute(data):
"""
分析 NTFS 数据结构中的80属性($DATA),返回文件分片数量
参数:
data (list): 包含字典的列表,每个字典需有'sequence'
(示例结构见问题描述)
返回:
int: 分片数量(常驻属性返回1非常驻属性返回数据运行的分片数)
异常:
ValueError: 当输入数据无效时抛出
"""
# 第一步提取并转换sequence数据
hex_bytes = []
for entry in data:
if 'sequence' in entry:
for hex_str in entry['sequence']:
hex_bytes.extend(hex_str.split())
print(hex_bytes)
# 将十六进制字符串转换为整数列表
try:
attribute_data = [int(x, 16) for x in hex_bytes]
except ValueError:
raise ValueError("无效的十六进制数据")
# 第二步:分析属性结构
if len(attribute_data) < 24:
raise ValueError("属性数据过短,无法解析头部信息")
# 检查属性类型(0x80)
if attribute_data[0] != 0x80:
raise ValueError("不是80属性($DATA属性)")
# 检查是否常驻(偏移0x08)
is_resident = attribute_data[8] == 0
if is_resident:
return 1
else:
# 解析非常驻属性的数据运行列表
data_run_offset = attribute_data[0x20] | (attribute_data[0x21] << 8)
if data_run_offset >= len(attribute_data):
raise ValueError("数据运行偏移超出属性长度")
data_runs = attribute_data[data_run_offset:]
fragment_count = 0
pos = 0
while pos < len(data_runs):
header_byte = data_runs[pos]
if header_byte == 0x00:
break
len_len = (header_byte >> 4) & 0x0F
offset_len = header_byte & 0x0F
if len_len == 0 or offset_len == 0:
break
pos += 1 + len_len + offset_len
fragment_count += 1
return fragment_count
input_data = [
{
'start_byte': 3221267456,
'offset': 264,
'sequence': [
'80 00 00 00 48 00 00 00',
'01 00 00 00 00 00 01 00',
'00 00 00 00 00 00 00 00',
'79 00 00 00 00 00 00 00',
'40 00 00 00 00 00 00 00',
'00 a0 07 00 00 00 00 00',
'0b 93 07 00 00 00 00 00',
'0b 93 07 00 00 00 00 00',
'31 7a 00 ee 0b 00 00 00'
],
'is_resident': False,
'total_groups': 9,
'attribute_length': 72
}
]
print(analyze_ntfs_data_attribute(input_data)) # 输出分片数量

View File

@@ -0,0 +1,105 @@
def ParseDataRuns(data_bytes: list, cluster_size=512):
"""
解析 NTFS $80 属性中的数据运行Data Run返回每个分片的起始字节数和长度。
参数:
data_bytes (list): 十六进制字符串组成的列表,表示完整的 $80 属性内容。
cluster_size (int): 簇大小(默认为 512 字节)
返回:
dict: 包含每个分片信息的字典,格式如下:
{
"is_resident": False,
"data_runs": {
"片段1": {"起始字节数": 3202351104, "字节长度": 499712 - 1},
"片段2": {...}
}
}
"""
def hex_list_to_int(lst, length, byteorder='little'):
"""从列表中提取指定长度的字节并转换为整数"""
bytes_data = bytes([int(x, 16) for x in lst[:length]])
return int.from_bytes(bytes_data, byteorder=byteorder)
result = {
"is_resident": True,
"data_runs": {}
}
# 检查是否是 $80 属性
if data_bytes[0] != '80':
raise ValueError("不是 $80 属性")
# 常驻标志在偏移 0x08第 8 个字节)
is_resident = data_bytes[8] == '00'
result["is_resident"] = is_resident
if is_resident:
result["data_runs"]["常驻文件"] = {
"起始字节数": 0,
"字节长度": "该文件为常驻,无分片"
}
return result
# 非常驻属性:获取数据运行偏移(偏移 0x20 处的 DWORD
data_run_offset = hex_list_to_int(data_bytes[0x20:0x20 + 4], 4)
if data_run_offset >= len(data_bytes):
raise ValueError("数据运行偏移超出范围")
# 提取数据运行部分
data_run_bytes = data_bytes[data_run_offset:]
pos = 0
fragment_index = 1
while pos < len(data_run_bytes):
header_byte = int(data_run_bytes[pos], 16)
if header_byte == 0x00:
break
# 高4位长度字段数量低4位偏移字段数量
len_len = (header_byte >> 4) & 0x0F
offset_len = header_byte & 0x0F
if len_len == 0 or offset_len == 0:
break
pos += 1
# 提取偏移量(小端序)
offset_bytes = data_run_bytes[pos:pos + offset_len]
offset = hex_list_to_int(offset_bytes, offset_len, byteorder='little')
# 提取长度(小端序)
length_bytes = data_run_bytes[pos + offset_len:pos + offset_len + len_len]
length = hex_list_to_int(length_bytes, len_len, byteorder='little')
# 计算起始字节数 = offset * cluster_size
start_byte = offset * cluster_size
byte_length = length * cluster_size - 1
result["data_runs"][f"片段{fragment_index}"] = {
"起始字节数": start_byte,
"字节长度": byte_length
}
pos += offset_len + len_len
fragment_index += 1
return result
input_data = [
'80', '00', '00', '00', '48', '00', '00', '00',
'01', '00', '00', '00', '00', '00', '01', '00',
'00', '00', '00', '00', '00', '00', '00', '00',
'79', '00', '00', '00', '00', '00', '00', '00',
'40', '00', '00', '00', '00', '00', '00', '00',
'00', 'a0', '07', '00', '00', '00', '00', '00',
'0b', '93', '07', '00', '00', '00', '00', '00',
'0b', '93', '07', '00', '00', '00', '00', '00',
'31', '7a', '00', 'ee', '0b', '00', '00', '00'
]
result = ParseDataRuns(input_data)
print(result)

12
uv.lock generated
View File

@@ -8,14 +8,10 @@ version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "psutil" },
{ name = "pytsk3" },
]
[package.metadata]
requires-dist = [
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pytsk3", specifier = ">=20250312" },
]
requires-dist = [{ name = "psutil", specifier = ">=7.0.0" }]
[[package]]
name = "psutil"
@@ -31,9 +27,3 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053, upload-time = "2025-02-13T21:54:34.31Z" },
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885, upload-time = "2025-02-13T21:54:37.486Z" },
]
[[package]]
name = "pytsk3"
version = "20250312"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/59/3f/2d440547eecca1786c2178a3e010e7fb61da1f0468d9809ff2b5b8fbb39b/pytsk3-20250312.tar.gz", hash = "sha256:bb47d4aa5976adbc8d4350bed719b771c548139bc8efe761e1d081aa99074c1b", size = 5274913, upload-time = "2025-03-12T05:49:14.937Z" }