Compare commits
15 Commits
new_db_sch
...
main
Author | SHA1 | Date | |
---|---|---|---|
![]() |
4d7c2e995c | ||
![]() |
491685e892 | ||
![]() |
d4a411ce68 | ||
![]() |
1fb457b67d | ||
![]() |
d2a3a7b5b5 | ||
![]() |
3347abe02f | ||
![]() |
0c98dfecda | ||
![]() |
cd536a6bd3 | ||
![]() |
08a47c6d8a | ||
846c7f6beb | |||
![]() |
deaf97607e | ||
![]() |
697b449bff | ||
![]() |
07a4ae7a74 | ||
![]() |
b2e14fdbe0 | ||
![]() |
e167ff5d9f |
3
.gitignore
vendored
3
.gitignore
vendored
@@ -175,4 +175,5 @@ cython_debug/
|
||||
.pypirc
|
||||
|
||||
# Custom stuff
|
||||
.idea/
|
||||
.idea/
|
||||
src/*.db
|
34
db_manage/clear_table_record.py
Normal file
34
db_manage/clear_table_record.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
def ClearTableRecordsWithReset(db_path, table_name):
|
||||
"""
|
||||
清空指定表的记录,并重置自增ID。
|
||||
|
||||
:param db_path: str, SQLite 数据库路径
|
||||
:param table_name: str, 表名
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
cursor.execute(f"DELETE FROM {table_name};")
|
||||
cursor.execute(f"DELETE FROM sqlite_sequence WHERE name='{table_name}';")
|
||||
conn.commit()
|
||||
print(f"表 [{table_name}] 已清空并重置自增ID")
|
||||
except sqlite3.Error as e:
|
||||
print(f"❌ 操作失败: {e}")
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_path')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_node')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_device')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_config')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_user')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_group')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_extend_extent')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_extend_name')
|
||||
|
@@ -138,7 +138,7 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
|
||||
extent4_Length INTEGER,
|
||||
|
||||
-- 外键约束(可选)
|
||||
FOREIGN KEY(PathID) REFERENCES path_table(ID),
|
||||
FOREIGN KEY(PathID) REFERENCES db_path(ID),
|
||||
FOREIGN KEY(ExtendNameID) REFERENCES extname_table(ID),
|
||||
FOREIGN KEY(GroupID) REFERENCES groups(ID),
|
||||
FOREIGN KEY(UserID) REFERENCES users(ID)
|
||||
|
23
fake_main.py
Normal file
23
fake_main.py
Normal file
@@ -0,0 +1,23 @@
|
||||
import itertools
|
||||
|
||||
from files_utils.files_save import CopySingleFragmentFiles, CopyMultiFragmentFiles
|
||||
from files_utils.files_sort import GetSortFragments
|
||||
from files_utils.folders_sort import ClassifyFilesAndFolders, ScanMultiFolders
|
||||
|
||||
fragment_lists = {}
|
||||
target_path = r"Z:\test_files"
|
||||
mix_test_data = [
|
||||
"test-copy"
|
||||
]
|
||||
classify_files_and_folders = ClassifyFilesAndFolders(mix_test_data)
|
||||
files_list = classify_files_and_folders["files"]
|
||||
folders_files_list = ScanMultiFolders(classify_files_and_folders["folders"])
|
||||
merged_list = list(itertools.chain(files_list, folders_files_list))
|
||||
|
||||
sort_fragments = GetSortFragments(db_path="./src/db_ntfs_info.db", files_list=merged_list)
|
||||
for item in sort_fragments:
|
||||
extent_count = item['extent_count']
|
||||
if extent_count == 1:
|
||||
CopySingleFragmentFiles(item, target_path=target_path)
|
||||
elif extent_count > 1:
|
||||
CopyMultiFragmentFiles(item, fragment_lists=fragment_lists, target_path=target_path)
|
131
files_utils/files_save.py
Normal file
131
files_utils/files_save.py
Normal file
@@ -0,0 +1,131 @@
|
||||
import os
|
||||
|
||||
|
||||
def GetVolumeLetter() -> str:
|
||||
from ntfs_utils.main import volume_letter
|
||||
return volume_letter
|
||||
|
||||
|
||||
def CopySingleFragmentFiles(source_data_dict, target_path):
|
||||
"""
|
||||
根据起始字节和长度,从磁盘中读取数据并保存为目标文件
|
||||
|
||||
:param source_data_dict: 包含源数据信息的字典
|
||||
:param target_path: 目标文件夹路径
|
||||
"""
|
||||
start_byte = source_data_dict.get("start_byte")
|
||||
byte_length = source_data_dict.get("length")
|
||||
absolute_path = source_data_dict.get("absolute_path")
|
||||
file_name = source_data_dict.get("filename")
|
||||
|
||||
if byte_length <= 0:
|
||||
print("错误:字节长度无效")
|
||||
return
|
||||
|
||||
if not absolute_path or not file_name:
|
||||
print("错误:缺少必要的文件信息")
|
||||
return
|
||||
|
||||
source_disk_path = GetVolumeLetter()
|
||||
target_file_path = os.path.join(target_path, file_name)
|
||||
|
||||
try:
|
||||
# 创建目标目录(如果不存在)
|
||||
os.makedirs(target_path, exist_ok=True)
|
||||
|
||||
with open(fr"\\.\{source_disk_path}:", 'rb') as disk:
|
||||
disk.seek(start_byte)
|
||||
|
||||
with open(target_file_path, 'wb') as f:
|
||||
remaining = byte_length
|
||||
CHUNK_SIZE = 1024 * 1024 # 1MB
|
||||
while remaining > 0:
|
||||
read_size = min(CHUNK_SIZE, remaining)
|
||||
chunk = disk.read(read_size)
|
||||
if not chunk:
|
||||
print("警告:读取到空数据,可能已到达磁盘末尾。")
|
||||
break
|
||||
f.write(chunk)
|
||||
remaining -= len(chunk)
|
||||
|
||||
print(
|
||||
f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}")
|
||||
|
||||
except PermissionError:
|
||||
print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序")
|
||||
except Exception as e:
|
||||
print(f"发生错误: {str(e)}")
|
||||
|
||||
|
||||
def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes:
|
||||
"""
|
||||
从指定磁盘的指定起始位置读取指定长度的字节。
|
||||
|
||||
:param volume_letter: 盘符(如 "Y")
|
||||
:param start_byte: 起始字节位置(整数)
|
||||
:param length: 要读取的字节数(整数)
|
||||
:return: 读取到的原始字节数据(bytes)
|
||||
"""
|
||||
if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1:
|
||||
raise ValueError("drive_letter 必须是单个字母,如 'Y'")
|
||||
|
||||
# 构建 Windows 设备路径格式:\\.\Y:
|
||||
disk_path = f"\\\\.\\{volume_letter.strip().upper()}:"
|
||||
|
||||
try:
|
||||
with open(disk_path, "rb") as disk:
|
||||
disk.seek(start_byte)
|
||||
data = disk.read(length)
|
||||
return data
|
||||
except PermissionError:
|
||||
raise PermissionError("权限不足,请以管理员身份运行程序")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"读取磁盘失败:{e}")
|
||||
|
||||
|
||||
def CopyMultiFragmentFiles(
|
||||
item: dict,
|
||||
fragment_lists: dict,
|
||||
target_path: str
|
||||
):
|
||||
"""
|
||||
处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。
|
||||
|
||||
:param item: 包含文件分片信息的字典
|
||||
:param fragment_lists: 存储各文件分片内容的字典
|
||||
:param target_path: 恢复文件的目标保存路径
|
||||
:return: None
|
||||
"""
|
||||
file_name = item['filename']
|
||||
extent_count = item['extent_count']
|
||||
fragment_index = item['fragment_index']
|
||||
start_byte = item['start_byte']
|
||||
length_byte = item['length']
|
||||
|
||||
volume_letter = GetVolumeLetter()
|
||||
|
||||
# 读取分片内容
|
||||
fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte)
|
||||
|
||||
# 如果还没有为这个文件创建列表,则初始化
|
||||
if file_name not in fragment_lists:
|
||||
fragment_lists[file_name] = [None] * extent_count
|
||||
|
||||
# 将内容插入到指定位置
|
||||
if fragment_index <= extent_count:
|
||||
fragment_lists[file_name][fragment_index - 1] = fragment_content
|
||||
print(f"已写入 {file_name} 的第 {fragment_index} 个片段。")
|
||||
else:
|
||||
print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}")
|
||||
|
||||
# 检查是否所有分片都已加载
|
||||
fragments = fragment_lists[file_name]
|
||||
if None not in fragments:
|
||||
full_content = b''.join(fragments)
|
||||
target_file_path = os.path.join(target_path, file_name)
|
||||
try:
|
||||
with open(target_file_path, 'wb') as f:
|
||||
f.write(full_content)
|
||||
print(f"已成功恢复文件:{file_name}")
|
||||
except Exception as e:
|
||||
print(f"写入文件失败:{file_name},错误:{e}")
|
148
files_utils/files_sort.py
Normal file
148
files_utils/files_sort.py
Normal file
@@ -0,0 +1,148 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path",
|
||||
files_path=None) -> list:
|
||||
"""
|
||||
根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。
|
||||
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 要查询的数据表名称
|
||||
:param files_path: 文件的完整路径列表
|
||||
:return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str}
|
||||
"""
|
||||
if files_path is None:
|
||||
files_path = []
|
||||
results = []
|
||||
|
||||
# 连接数据库
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
for path in files_path:
|
||||
try:
|
||||
# 使用字符串格式化插入表名,参数化查询只适用于值
|
||||
sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?"
|
||||
cursor.execute(sql, (path,))
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
results.append({
|
||||
'absolute_path': path,
|
||||
'id': row[0],
|
||||
'name': row[1]
|
||||
})
|
||||
else:
|
||||
print(f"未找到匹配记录:{path}")
|
||||
except Exception as e:
|
||||
print(f"查询失败:{path},错误:{e}")
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node",
|
||||
path_records: list = None) -> list:
|
||||
"""
|
||||
根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。
|
||||
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: db_node 表名
|
||||
:param path_records: 来自 get_db_path_info 的结果列表
|
||||
:return: 包含文件分片信息的结果列表
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
results = []
|
||||
|
||||
for record in path_records:
|
||||
path_id = record['id']
|
||||
absolute_path = record['absolute_path']
|
||||
name = record['name']
|
||||
|
||||
try:
|
||||
# 查询 db_node 表中 PathID 对应的记录
|
||||
cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,))
|
||||
row = cursor.fetchone()
|
||||
|
||||
if not row:
|
||||
print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录")
|
||||
continue
|
||||
|
||||
# 获取字段索引(适用于按列名获取)
|
||||
columns = [desc[0] for desc in cursor.description]
|
||||
|
||||
# 构建字典以便按列名访问
|
||||
node_data = dict(zip(columns, row))
|
||||
|
||||
# 获取 ExtentCount
|
||||
extent_count = node_data.get("ExtentCount", 0)
|
||||
|
||||
# 解析分片信息
|
||||
fragments = []
|
||||
for i in range(1, 5): # extent1 ~ extent4
|
||||
loc = node_data.get(f"extent{i}_Location")
|
||||
length = node_data.get(f"extent{i}_Length")
|
||||
|
||||
if loc is not None and length is not None and length > 0:
|
||||
fragments.append({
|
||||
"start_byte": loc,
|
||||
"length": length
|
||||
})
|
||||
|
||||
results.append({
|
||||
"absolute_path": absolute_path,
|
||||
"name": name,
|
||||
"path_id": path_id,
|
||||
"extent_count": extent_count,
|
||||
"fragments": fragments
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
print(f"查询失败:PathID={path_id}, 错误:{e}")
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
def SortFragmentsByStartByte(file_extents_list: list) -> list:
|
||||
"""
|
||||
对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。
|
||||
|
||||
:param file_extents_list: get_file_extents_info 返回的结果列表
|
||||
:return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息
|
||||
"""
|
||||
all_fragments = []
|
||||
|
||||
for file_info in file_extents_list:
|
||||
absolute_path = file_info['absolute_path']
|
||||
filename = file_info['name']
|
||||
extent_count = file_info['extent_count']
|
||||
fragments = file_info['fragments']
|
||||
|
||||
# 对当前文件的片段排序(虽然通常已经是有序的)
|
||||
sorted_fragments = sorted(fragments, key=lambda x: x['start_byte'])
|
||||
|
||||
# 添加片段索引信息
|
||||
for idx, fragment in enumerate(sorted_fragments, start=1):
|
||||
all_fragments.append({
|
||||
'absolute_path': absolute_path,
|
||||
'filename': filename,
|
||||
'extent_count': extent_count,
|
||||
'start_byte': fragment['start_byte'],
|
||||
'length': fragment['length'],
|
||||
'fragment_index': idx
|
||||
})
|
||||
|
||||
# 全局排序:按 start_byte 排序所有片段
|
||||
all_fragments.sort(key=lambda x: x['start_byte'])
|
||||
|
||||
return all_fragments
|
||||
|
||||
|
||||
def GetSortFragments(db_path: str = "../src/db_ntfs_info.db", files_list: list = None) -> list:
|
||||
path_info = GetFilesDBPathInfo(db_path=db_path, table_name="db_path", files_path=files_list)
|
||||
node_info = GetFilesDBNodeInfo(db_path=db_path, table_name="db_node", path_records=path_info)
|
||||
result = SortFragmentsByStartByte(node_info)
|
||||
return result
|
14
files_utils/folders_save.py
Normal file
14
files_utils/folders_save.py
Normal file
@@ -0,0 +1,14 @@
|
||||
import subprocess
|
||||
|
||||
source_path = r"Y:\\test-copy"
|
||||
target_path = r"Z:\\test-copy"
|
||||
|
||||
subprocess.run([
|
||||
"robocopy",
|
||||
source_path,
|
||||
target_path,
|
||||
"/E", # 包括子目录
|
||||
"/R:3", # 重试次数
|
||||
"/W:1", # 重试等待时间
|
||||
"/MT:16" # 多线程(16线程)
|
||||
])
|
263
files_utils/folders_sort.py
Normal file
263
files_utils/folders_sort.py
Normal file
@@ -0,0 +1,263 @@
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
from files_utils.files_sort import GetFilesDBNodeInfo, SortFragmentsByStartByte
|
||||
from ntfs_utils.main import volume_letter
|
||||
|
||||
|
||||
def GetFolderID(
|
||||
folder_path: str,
|
||||
db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path"
|
||||
) -> int | None:
|
||||
"""
|
||||
根据文件夹路径,查询数据库中该文件夹对应的 ID。
|
||||
|
||||
:param folder_path: 文件夹路径(如 r"CloudMusic\\")
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 要查询的数据表名称,默认为 'db_path'
|
||||
:return: 成功则返回 ID(int),失败返回 None
|
||||
"""
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 使用 table_name 构建 SQL 查询
|
||||
sql = f"SELECT ID FROM {table_name} WHERE Path = ?"
|
||||
cursor.execute(sql, (folder_path,))
|
||||
result = cursor.fetchone()
|
||||
|
||||
if result:
|
||||
return result[0]
|
||||
else:
|
||||
print(f"未找到路径:{folder_path} 在表 {table_name} 中")
|
||||
return None
|
||||
|
||||
except sqlite3.Error as e:
|
||||
print(f"数据库操作失败:{e}")
|
||||
return None
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def GetSubPathsByParentID(
|
||||
parent_id: int,
|
||||
db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path"
|
||||
) -> list:
|
||||
"""
|
||||
根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。
|
||||
|
||||
:param parent_id: 父节点 ID
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 数据表名称
|
||||
:return: 包含 ID、Path、Name 的字典列表
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
sql = f"""
|
||||
SELECT ID, Path, Name
|
||||
FROM {table_name}
|
||||
WHERE ParentID = ?
|
||||
"""
|
||||
|
||||
try:
|
||||
cursor.execute(sql, (parent_id,))
|
||||
rows = cursor.fetchall()
|
||||
except Exception as e:
|
||||
print(f"数据库查询失败:{e}")
|
||||
return []
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
item = {
|
||||
'id': row[0],
|
||||
'absolute_path': row[1],
|
||||
'name': row[2]
|
||||
}
|
||||
results.append(item)
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list:
|
||||
"""
|
||||
根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。
|
||||
:param db_path: 要查询的数据库
|
||||
:param folder_path: 文件夹的绝对路径
|
||||
:return list: 文件夹下所有文件按片段顺序排列的列表
|
||||
"""
|
||||
parent_id = GetFolderID(folder_path=folder_path, db_path=db_path)
|
||||
path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path)
|
||||
node_data = GetFilesDBNodeInfo(path_records=path_data)
|
||||
result = SortFragmentsByStartByte(node_data)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# folder_path_test = "pictures/"
|
||||
# data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test)
|
||||
# for item in data:
|
||||
# print(item)
|
||||
|
||||
|
||||
def ScanDirectory(root_dir, skip_system=True):
|
||||
"""
|
||||
递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。
|
||||
|
||||
:param root_dir: 要扫描的根目录路径
|
||||
:param skip_system: 是否跳过系统目录(默认 True)
|
||||
:return: 文件路径列表,格式为 relative/path/to/file.ext
|
||||
"""
|
||||
file_list = []
|
||||
|
||||
for root, dirs, files in os.walk(root_dir):
|
||||
# 跳过系统目录
|
||||
if skip_system:
|
||||
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
|
||||
|
||||
for file in files:
|
||||
full_path = os.path.join(root, file)
|
||||
|
||||
# 去掉盘符
|
||||
_, relative_path = os.path.splitdrive(full_path)
|
||||
|
||||
# 替换 \ 为 /
|
||||
relative_path = relative_path.lstrip("\\").replace("\\", "/")
|
||||
|
||||
file_list.append(relative_path)
|
||||
|
||||
return file_list
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# folder_path = r"Y:/folder1/"
|
||||
# files_list = ScanDirectory(folder_path)
|
||||
#
|
||||
# print(f"共找到 {len(files_list)} 个文件:")
|
||||
# for f in files_list:
|
||||
# print(f)
|
||||
|
||||
|
||||
def ScanMultiFolders(folder_paths_list: list, skip_system: bool = True) -> list:
|
||||
"""
|
||||
扫描多个根目录,返回所有文件的相对路径列表(格式为 folder/file.ext)。
|
||||
|
||||
:param folder_paths_list: 要扫描的根目录列表(如 ["CloudMusic/", "folder1/"])
|
||||
:param skip_system: 是否跳过系统目录
|
||||
:return: 文件路径列表(统一格式为 folder/file.ext)
|
||||
"""
|
||||
all_files = []
|
||||
|
||||
for root_dir in folder_paths_list:
|
||||
# 规范化输入路径,确保结尾有 '/'(如果是目录)
|
||||
normalized_root_dir = root_dir.replace("\\", "/")
|
||||
if not normalized_root_dir.endswith("/"):
|
||||
normalized_root_dir += "/" # 确保结尾 /
|
||||
|
||||
full_root_path = f"{volume_letter}:/{normalized_root_dir}"
|
||||
full_root_path = os.path.normpath(full_root_path)
|
||||
|
||||
if not os.path.exists(full_root_path):
|
||||
print(f"⚠️ 路径不存在:{full_root_path}")
|
||||
continue
|
||||
|
||||
for root, dirs, files in os.walk(full_root_path):
|
||||
if skip_system:
|
||||
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
|
||||
|
||||
for file in files:
|
||||
full_file_path = os.path.join(root, file)
|
||||
|
||||
# 去掉盘符
|
||||
_, relative_path = os.path.splitdrive(full_file_path)
|
||||
|
||||
# 去除开头和结尾的 '\' 或 '/' 并替换分隔符
|
||||
normalized_path = relative_path.strip("\\").strip("/").replace("\\", "/")
|
||||
|
||||
all_files.append(normalized_path)
|
||||
|
||||
return all_files
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# folders = [
|
||||
# "CloudMusic\\",
|
||||
# "folder1/"
|
||||
# ]
|
||||
#
|
||||
# files = ScanMultiFolders(folders)
|
||||
#
|
||||
# print(f"共找到 {len(files)} 个文件:")
|
||||
# for f in files:
|
||||
# print(f)
|
||||
|
||||
|
||||
def ClassifyFilesAndFolders(paths: list) -> dict:
|
||||
"""
|
||||
将传入的路径列表分类为文件和目录,并统一使用 '/' 分隔符。
|
||||
确保目录路径以 '/' 结尾。
|
||||
|
||||
:param paths: 路径列表(元素可以是文件或目录)
|
||||
:return: 包含 'files' 和 'directories' 的字典,路径格式统一为 '/'
|
||||
"""
|
||||
files = []
|
||||
directories = []
|
||||
|
||||
for path in paths:
|
||||
# 统一用 '/' 分隔符,并保留原始结构(是否以 '/' 结尾)
|
||||
normalized_path = path.replace("\\", "/")
|
||||
|
||||
# 判断是否原本是目录(以 '/' 或 '\' 结尾)
|
||||
is_potential_dir = normalized_path.endswith("/")
|
||||
|
||||
# 拼接完整路径用于判断是否存在
|
||||
full_path = f"{volume_letter}:/{normalized_path.lstrip('/')}"
|
||||
full_path = os.path.normpath(full_path)
|
||||
|
||||
if os.path.isfile(full_path):
|
||||
# 如果是文件,去掉结尾的 /(如果有的话)
|
||||
if normalized_path.endswith("/"):
|
||||
normalized_path = normalized_path.rstrip("/")
|
||||
files.append(normalized_path)
|
||||
elif os.path.isdir(full_path):
|
||||
# 如果是目录,确保以 '/' 结尾
|
||||
if not normalized_path.endswith("/"):
|
||||
normalized_path += "/"
|
||||
directories.append(normalized_path)
|
||||
else:
|
||||
print(f"⚠️ 路径不存在或类型未知:{normalized_path}")
|
||||
|
||||
return {
|
||||
'files': files,
|
||||
'folders': directories
|
||||
}
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# test_paths = [
|
||||
# "CloudMusic\\AGA - MIZU.mp3",
|
||||
# "CloudMusic/AGA - 一.mp3",
|
||||
# "CloudMusic/Aaron Zigman - Main Title.mp3",
|
||||
# "CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
|
||||
# "CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
|
||||
# "CloudMusic/Ava Max - Sweet but Psycho.mp3",
|
||||
# "CloudMusic\\",
|
||||
# "folder1/",
|
||||
# "CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
|
||||
# "CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
|
||||
# ]
|
||||
#
|
||||
# result = ClassifyFilesAndFolders(test_paths)
|
||||
#
|
||||
# print("✅ 文件列表:")
|
||||
# for f in result['files']:
|
||||
# print(f)
|
||||
#
|
||||
# print("\n📁 文件夹列表:")
|
||||
# for d in result['directories']:
|
||||
# print(d)
|
@@ -8,9 +8,10 @@ def GetNTFSBootInfo(volume_letter):
|
||||
- Bytes per sector
|
||||
- Sectors per cluster
|
||||
- Cluster size (bytes)
|
||||
- $MFT 起始簇号
|
||||
|
||||
参数:
|
||||
volume_letter: 卷标字符串,例如 'C'
|
||||
volume_letter: str,卷标字符串,例如 'C'
|
||||
|
||||
返回:
|
||||
dict 包含上述信息
|
||||
@@ -62,10 +63,15 @@ def GetNTFSBootInfo(volume_letter):
|
||||
# 计算簇大小
|
||||
cluster_size = bytes_per_sector * sectors_per_cluster
|
||||
|
||||
# 解析 $MFT 起始簇号(LCN),偏移 0x30,QWORD(8 字节)
|
||||
mft_lcn_bytes = buffer[0x30:0x38]
|
||||
mft_lcn = int.from_bytes(mft_lcn_bytes, byteorder='little', signed=False)
|
||||
|
||||
return {
|
||||
"BytesPerSector": bytes_per_sector,
|
||||
"SectorsPerCluster": sectors_per_cluster,
|
||||
"ClusterSize": cluster_size
|
||||
"ClusterSize": cluster_size,
|
||||
"MftPosition": mft_lcn
|
||||
}
|
||||
|
||||
|
||||
|
266
ntfs_utils/db_node.py
Normal file
266
ntfs_utils/db_node.py
Normal file
@@ -0,0 +1,266 @@
|
||||
import hashlib
|
||||
import os
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
|
||||
from ntfs_utils.mft_analyze import GetFile80hPattern, GetFragmentData, ExtractSequenceHexValues, hex_list_to_int
|
||||
from ntfs_utils.main import volume_letter
|
||||
|
||||
|
||||
# 工具函数:获取文件扩展名
|
||||
def GetFileExtension(name: str) -> str:
|
||||
parts = name.rsplit('.', 1)
|
||||
return parts[1].lower() if len(parts) > 1 else ""
|
||||
|
||||
|
||||
# 获取 ExtendNameID(基于文件名后缀)
|
||||
def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int:
|
||||
ext = GetFileExtension(name)
|
||||
if not ext:
|
||||
return 0
|
||||
|
||||
cursor.execute("SELECT ID FROM db_extend_name WHERE ExtendName = ?", (ext,))
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
# 获取 DirLayer(路径层级)
|
||||
def GetDirLayer(path: str) -> int:
|
||||
path = path.strip()
|
||||
if not path or path == "\\":
|
||||
return 0
|
||||
return path.count("\\") - 1
|
||||
|
||||
|
||||
# 获取 GroupID(默认第一个)
|
||||
def GetFirstGroupId(cursor: sqlite3.Cursor) -> int:
|
||||
cursor.execute("SELECT ID FROM db_group ORDER BY ID LIMIT 1")
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
# 获取 UserID(默认第一个)
|
||||
def GetFirstUserId(cursor: sqlite3.Cursor) -> int:
|
||||
cursor.execute("SELECT ID FROM db_user ORDER BY ID LIMIT 1")
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
def GetFilesTime(file_path):
|
||||
"""
|
||||
获取指定文件的创建时间、修改时间、访问时间和权限变更时间。
|
||||
st_atime: 最后一次访问时间(FileAccessTime)
|
||||
st_mtime: 最后一次修改内容的时间(FileModifyTime)
|
||||
st_ctime: 文件元数据(metadata)更改时间,在 Windows 中是文件创建时间(FileCreateTime)
|
||||
参数:
|
||||
file_path (str): 文件的绝对路径
|
||||
|
||||
返回:
|
||||
dict: 包含 FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime 的字符串格式,
|
||||
如果无法获取则返回 "default"。
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
return {
|
||||
"FileCreateTime": "default",
|
||||
"FileModifyTime": "default",
|
||||
"FileAccessTime": "default",
|
||||
"FileAuthTime": "default"
|
||||
}
|
||||
|
||||
try:
|
||||
stat_info = os.stat(file_path)
|
||||
|
||||
def ts_to_str(timestamp):
|
||||
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
create_time = ts_to_str(stat_info.st_ctime)
|
||||
modify_time = ts_to_str(stat_info.st_mtime)
|
||||
access_time = ts_to_str(stat_info.st_atime)
|
||||
|
||||
# 权限变更时间,Windows 下可能不适用
|
||||
try:
|
||||
auth_time = ts_to_str(getattr(stat_info, 'st_birthtime', stat_info.st_ctime))
|
||||
except Exception:
|
||||
auth_time = "default"
|
||||
|
||||
return {
|
||||
"FileCreateTime": create_time,
|
||||
"FileModifyTime": modify_time,
|
||||
"FileAccessTime": access_time,
|
||||
"FileAuthTime": auth_time
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 获取文件时间失败: {e}")
|
||||
return {
|
||||
"FileCreateTime": "default",
|
||||
"FileModifyTime": "default",
|
||||
"FileAccessTime": "default",
|
||||
"FileAuthTime": "default"
|
||||
}
|
||||
|
||||
|
||||
# 获取设备ID(db_device第一条记录)
|
||||
def GetDeviceId(cursor: sqlite3.Cursor) -> int:
|
||||
cursor.execute("SELECT ID FROM db_device ORDER BY ID LIMIT 1")
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
# 获取文件大小(伪数据)
|
||||
def GetFileSize(file80h_pattern):
|
||||
if not file80h_pattern or not isinstance(file80h_pattern, list):
|
||||
return 0
|
||||
|
||||
if file80h_pattern[0].get('is_resident'):
|
||||
fragments = GetFragmentData(file80h_pattern)
|
||||
if fragments and len(fragments) > 0:
|
||||
return fragments[0].get('byte_length', 0)
|
||||
else:
|
||||
sequence_list = ExtractSequenceHexValues(file80h_pattern)
|
||||
if len(sequence_list) < 64:
|
||||
raise ValueError("序列长度不足,无法解析文件大小")
|
||||
|
||||
size_list = sequence_list[56:64]
|
||||
size = hex_list_to_int(size_list)
|
||||
return size
|
||||
|
||||
|
||||
# 获取文件内容哈希(伪数据)
|
||||
def GetFileHash(full_path: str) -> str:
|
||||
return hashlib.sha256(full_path.encode()).hexdigest()
|
||||
|
||||
|
||||
# 新增:获取文件片段位置和长度
|
||||
def GetFragmentLocation(fragment):
|
||||
return fragment.get('starting_byte', 0)
|
||||
|
||||
|
||||
def GetFragmentLength(fragment):
|
||||
return fragment.get('byte_length', 0)
|
||||
|
||||
|
||||
# 主函数:将 db_path 数据导入 db_node
|
||||
def InsertNodeDataToDB(db_path='../src/db_ntfs_info.db', table_name='db_node', batch_size=20):
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
if len(volume_letter) == 1:
|
||||
volume_root = f"{volume_letter}:\\"
|
||||
elif volume_letter.endswith(':'):
|
||||
volume_root = f"{volume_letter}\\"
|
||||
else:
|
||||
volume_root = f"{volume_letter}:\\" # 支持 "Y" 或 "Y:" 输入
|
||||
|
||||
print(f"🔍 当前处理磁盘根目录:{volume_root}")
|
||||
|
||||
group_id = GetFirstGroupId(cursor)
|
||||
user_id = GetFirstUserId(cursor)
|
||||
device_id = GetDeviceId(cursor)
|
||||
|
||||
cursor.execute("SELECT ID, Path, Name, ParentID FROM db_path")
|
||||
rows = cursor.fetchall()
|
||||
|
||||
insert_fields = [
|
||||
'PathID', 'ParentID', 'NameHash', 'PathHash',
|
||||
'ExtendNameID', 'DirLayer', 'GroupID', 'UserID',
|
||||
'FileCreateTime', 'FileModifyTime', 'FileAccessTime', 'FileAuthTime',
|
||||
'FileSize', 'FileMode', 'FileHash', 'ExtentCount',
|
||||
# extent 字段
|
||||
"extent1_DeviceID", "extent1_Location", "extent1_Length",
|
||||
"extent2_DeviceID", "extent2_Location", "extent2_Length",
|
||||
"extent3_DeviceID", "extent3_Location", "extent3_Length",
|
||||
"extent4_DeviceID", "extent4_Location", "extent4_Length"
|
||||
]
|
||||
insert_placeholders = ', '.join('?' * len(insert_fields))
|
||||
insert_sql = f"INSERT INTO {table_name} ({', '.join(insert_fields)}) VALUES ({insert_placeholders})"
|
||||
|
||||
batch = []
|
||||
|
||||
for row in rows:
|
||||
path_id, relative_path, name, parent_id = row
|
||||
|
||||
full_path = os.path.join(volume_root, relative_path)
|
||||
|
||||
# 检查是否已存在相同 PathID
|
||||
cursor.execute("SELECT COUNT(*) FROM db_node WHERE PathID = ?", (path_id,))
|
||||
exists = cursor.fetchone()[0]
|
||||
if exists > 0:
|
||||
print(f"⚠️ PathID {path_id} 已存在,跳过插入")
|
||||
continue
|
||||
|
||||
try:
|
||||
file80h_pattern = GetFile80hPattern(full_path)
|
||||
fragments = GetFragmentData(file80h_pattern)
|
||||
extent_count = min(len(fragments), 4)
|
||||
except Exception as e:
|
||||
print(f"⚠️ 获取 ExtentCount 失败,使用默认值 0: {e}")
|
||||
fragments = []
|
||||
extent_count = 0
|
||||
|
||||
# 计算字段
|
||||
name_hash = hashlib.sha256(name.encode()).hexdigest()
|
||||
dir_layer = GetDirLayer(relative_path)
|
||||
extend_name_id = GetExtendNameId(name, cursor)
|
||||
|
||||
try:
|
||||
file_size = GetFileSize(file80h_pattern)
|
||||
except Exception as e:
|
||||
print(f"⚠️ 获取文件大小失败,使用默认值 0: {e}")
|
||||
file_size = 0
|
||||
|
||||
file_hash = GetFileHash(full_path)
|
||||
|
||||
# 获取时间信息
|
||||
file_times = GetFilesTime(full_path)
|
||||
create_time = file_times["FileCreateTime"]
|
||||
modify_time = file_times["FileModifyTime"]
|
||||
access_time = file_times["FileAccessTime"]
|
||||
auth_time = file_times["FileAuthTime"]
|
||||
|
||||
# 查询 PathHash
|
||||
cursor.execute("SELECT PathHash FROM db_path WHERE ID = ?", (path_id,))
|
||||
path_hash_result = cursor.fetchone()
|
||||
path_hash = path_hash_result[0] if path_hash_result else ""
|
||||
|
||||
# 构建 extent 字段
|
||||
extent_data = []
|
||||
for i in range(4): # 最多4个 extent
|
||||
if i < len(fragments):
|
||||
frag = fragments[i]
|
||||
location = GetFragmentLocation(frag)
|
||||
length = GetFragmentLength(frag)
|
||||
extent_data.extend([device_id, location, length])
|
||||
else:
|
||||
extent_data.extend([None, None, None])
|
||||
|
||||
# 构建插入数据
|
||||
values = [
|
||||
path_id, parent_id, name_hash, path_hash,
|
||||
extend_name_id, dir_layer, group_id, user_id,
|
||||
create_time, modify_time, access_time, auth_time,
|
||||
file_size, 'default', file_hash, extent_count,
|
||||
*extent_data
|
||||
]
|
||||
|
||||
batch.append(values)
|
||||
|
||||
# 批量插入
|
||||
if len(batch) >= batch_size:
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
print(f"✅ 提交一批 {len(batch)} 条记录到 {table_name}")
|
||||
batch.clear()
|
||||
|
||||
# 插入剩余不足一批的数据
|
||||
if batch:
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
print(f"✅ 提交最后一批 {len(batch)} 条记录到 {table_name}")
|
||||
|
||||
conn.close()
|
||||
print(f"✅ 数据已成功插入到 {table_name} 表")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
InsertNodeDataToDB()
|
@@ -23,27 +23,26 @@ def ShouldSkipPath(path: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def ScanVolume(volume_letter: str):
|
||||
def ScanVolume(volume_letter: str) -> list:
|
||||
"""
|
||||
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
|
||||
并为每个节点分配 ParentID。
|
||||
|
||||
返回:
|
||||
list of dict:包含文件/目录信息的字典列表
|
||||
"""
|
||||
|
||||
root_path = f"{volume_letter.upper()}:\\"
|
||||
if not os.path.exists(root_path):
|
||||
raise ValueError(f"磁盘 {root_path} 不存在")
|
||||
|
||||
result = []
|
||||
path_to_id = {} # 用于记录路径到数据库 ID 的映射
|
||||
counter = 1 # 模拟数据库自增 ID
|
||||
path_to_id = {} # 路径 -> ID 映射
|
||||
counter = 1
|
||||
|
||||
for root, dirs, files in os.walk(root_path, topdown=True, onerror=None, followlinks=False):
|
||||
# 过滤掉需要跳过的目录
|
||||
dirs[:] = [d for d in dirs if not ShouldSkipPath(os.path.join(root, d))]
|
||||
|
||||
for entry in files + dirs:
|
||||
entries = files + dirs
|
||||
|
||||
for entry in entries:
|
||||
full_path = os.path.join(root, entry)
|
||||
|
||||
if ShouldSkipPath(full_path):
|
||||
@@ -61,21 +60,32 @@ def ScanVolume(volume_letter: str):
|
||||
|
||||
name = entry
|
||||
|
||||
# ✅ 修正点:对 Path 字段进行哈希
|
||||
path_hash = GenerateHash(full_path)
|
||||
# 分离盘符并处理路径格式
|
||||
_, relative_path = os.path.splitdrive(full_path)
|
||||
relative_path = relative_path.lstrip("\\").rstrip("\\")
|
||||
if os.path.isdir(full_path) and not relative_path.endswith("/"):
|
||||
relative_path += "/"
|
||||
|
||||
relative_path = relative_path.replace("\\", "/")
|
||||
|
||||
path_hash = GenerateHash(relative_path)
|
||||
|
||||
# 计算 ContentSize(KB),小文件至少显示为 1 KB
|
||||
content_size = bytes_size // 1024
|
||||
if content_size == 0 and bytes_size > 0:
|
||||
content_size = 1
|
||||
|
||||
# 获取父目录路径
|
||||
parent_path = os.path.dirname(full_path)
|
||||
parent_id = path_to_id.get(parent_path, 0) # 默认为 0(根目录可能未录入)
|
||||
_, parent_relative_path = os.path.splitdrive(parent_path)
|
||||
parent_relative_path = parent_relative_path.lstrip("\\").rstrip("\\")
|
||||
if os.path.isdir(parent_path) and not parent_relative_path.endswith("/"):
|
||||
parent_relative_path += "/"
|
||||
parent_relative_path = parent_relative_path.replace("\\", "/")
|
||||
|
||||
parent_id = path_to_id.get(parent_relative_path, 0)
|
||||
|
||||
item = {
|
||||
"ID": counter,
|
||||
"Path": full_path,
|
||||
"Path": relative_path,
|
||||
"Name": name,
|
||||
"PathHash": path_hash,
|
||||
"IsDir": is_dir,
|
||||
@@ -83,25 +93,27 @@ def ScanVolume(volume_letter: str):
|
||||
"ContentSize": content_size
|
||||
}
|
||||
|
||||
result.append(item)
|
||||
path_to_id[full_path] = counter
|
||||
yield item # 使用 yield 返回每条记录
|
||||
path_to_id[relative_path] = counter
|
||||
counter += 1
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ 跳过路径 {full_path},错误: {e}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
|
||||
def InsertPathDataToDB(data_generator, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
|
||||
"""
|
||||
批量将扫描结果写入数据库。
|
||||
流式写入数据库,边扫描边入库。
|
||||
|
||||
:param data_generator: 可迭代对象(如生成器)
|
||||
:param db_path: 数据库路径
|
||||
:param table_name: 表名
|
||||
:param batch_size: 每多少条记录提交一次
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 创建表(如果不存在)
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
@@ -111,23 +123,20 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
|
||||
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
|
||||
ParentID INTEGER,
|
||||
ContentSize INTEGER,
|
||||
|
||||
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
|
||||
);
|
||||
"""
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 插入语句(忽略重复 PathHash)
|
||||
insert_sql = f"""
|
||||
INSERT OR IGNORE INTO {table_name}
|
||||
(Path, Name, PathHash, IsDir, ParentID, ContentSize)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
"""
|
||||
|
||||
total_inserted = 0
|
||||
batch = []
|
||||
|
||||
for item in data:
|
||||
for item in data_generator:
|
||||
batch.append((
|
||||
item['Path'],
|
||||
item['Name'],
|
||||
@@ -140,39 +149,34 @@ def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_pa
|
||||
if len(batch) >= batch_size:
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
total_inserted += cursor.rowcount
|
||||
print(f"✅ 提交一批 {len(batch)} 条数据")
|
||||
batch.clear()
|
||||
|
||||
# 插入剩余数据
|
||||
# 提交剩余不足一批的数据
|
||||
if batch:
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
total_inserted += cursor.rowcount
|
||||
print(f"✅ 提交最后一批 {len(batch)} 条数据")
|
||||
|
||||
print(f"✅ 总共插入 {total_inserted} 条记录到数据库。")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 插入失败: {e}")
|
||||
conn.rollback()
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# 示例主函数
|
||||
def main():
|
||||
volume_letter = "Z"
|
||||
|
||||
def DBPathMain(volume_letter: str):
|
||||
print(f"🔍 开始全盘扫描磁盘 {volume_letter}:\\ ...")
|
||||
scanned_data = ScanVolume(volume_letter)
|
||||
|
||||
print(f"📊 共扫描到 {len(scanned_data)} 条有效记录,开始入库...")
|
||||
InsertPathDataToDB(scanned_data)
|
||||
# 获取生成器对象
|
||||
generator = ScanVolume(volume_letter)
|
||||
|
||||
print(f"📊 开始逐批入库...")
|
||||
InsertPathDataToDB(generator)
|
||||
|
||||
print("✅ 全盘扫描与入库完成")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
DBPathMain(volume_letter="Y")
|
||||
|
@@ -1,14 +1,15 @@
|
||||
from db_config import GetNTFSBootInfo, InsertInfoToDBConfig
|
||||
from db_device import ScanSpecialVolumes, InsertVolumesToDB
|
||||
from db_extend_name import InsertExtensionsToDB
|
||||
from db_group import InsertGroupToDB
|
||||
from db_path import GenerateHash, ShouldSkipPath, ScanVolume, InsertPathDataToDB
|
||||
from db_user import InsertUserToDB
|
||||
from ntfs_utils.db_config import GetNTFSBootInfo, InsertInfoToDBConfig
|
||||
from ntfs_utils.db_device import ScanSpecialVolumes, InsertVolumesToDB
|
||||
from ntfs_utils.db_extend_name import InsertExtensionsToDB
|
||||
from ntfs_utils.db_group import InsertGroupToDB
|
||||
# from ntfs_utils.db_node import InsertNodeDataToDB
|
||||
from ntfs_utils.db_path import DBPathMain
|
||||
from ntfs_utils.db_user import InsertUserToDB
|
||||
|
||||
volume_letter = 'Y'
|
||||
|
||||
|
||||
def main():
|
||||
volume_letter = 'Z'
|
||||
|
||||
# 初始化 db_config 表
|
||||
config_data = GetNTFSBootInfo(volume_letter)
|
||||
InsertInfoToDBConfig(config_data)
|
||||
@@ -25,10 +26,6 @@ def main():
|
||||
group_name_list = ["Copier"]
|
||||
InsertGroupToDB(group_name_list)
|
||||
|
||||
# 初始化 db_path 表
|
||||
scanned_data = ScanVolume(volume_letter)
|
||||
InsertPathDataToDB(scanned_data)
|
||||
|
||||
# 初始化 db_extend_name 表
|
||||
common_extensions = [
|
||||
"txt", "log", "csv", "xls", "xlsx", "doc", "docx",
|
||||
@@ -40,6 +37,12 @@ def main():
|
||||
count = InsertExtensionsToDB(common_extensions)
|
||||
print(f"共插入 {count} 个新扩展名。")
|
||||
|
||||
# 初始化 db_path 表
|
||||
DBPathMain(volume_letter=volume_letter)
|
||||
|
||||
# 初始化 db_node 表
|
||||
# InsertNodeDataToDB(volume_letter)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
408
ntfs_utils/mft_analyze.py
Normal file
408
ntfs_utils/mft_analyze.py
Normal file
@@ -0,0 +1,408 @@
|
||||
import os
|
||||
|
||||
import pytsk3
|
||||
|
||||
from ntfs_utils.db_config import GetNTFSBootInfo
|
||||
|
||||
|
||||
def find_file_mft_entry(fs, target_path):
|
||||
"""
|
||||
在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号
|
||||
"""
|
||||
|
||||
def traverse_directory(inode, path_components):
|
||||
if not path_components:
|
||||
return inode
|
||||
|
||||
dir_name = path_components[0].lower()
|
||||
try:
|
||||
directory = fs.open_dir(inode=inode)
|
||||
except Exception as e:
|
||||
print(f"Error opening directory with inode {inode}: {e}")
|
||||
return None
|
||||
|
||||
for entry in directory:
|
||||
if not entry.info or not entry.info.name or not entry.info.meta:
|
||||
continue
|
||||
|
||||
name = entry.info.name.name.decode('utf-8', errors='ignore').lower()
|
||||
meta = entry.info.meta
|
||||
|
||||
# 匹配当前层级目录或文件名
|
||||
if name == dir_name:
|
||||
if len(path_components) == 1:
|
||||
# 是目标文件/目录
|
||||
return meta.addr
|
||||
|
||||
elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR:
|
||||
# 继续深入查找子目录
|
||||
next_inode = entry.info.meta.addr
|
||||
result = traverse_directory(next_inode, path_components[1:])
|
||||
if result:
|
||||
return result
|
||||
return None
|
||||
|
||||
# 拆分路径
|
||||
path_parts = target_path.strip("\\").lower().split("\\")
|
||||
root_inode = fs.info.root_inum # 根目录 MFT Entry
|
||||
return traverse_directory(root_inode, path_parts)
|
||||
|
||||
|
||||
def GetFileMftEntry(file_path):
|
||||
"""
|
||||
获取指定文件在 NTFS 中的 MFT Entry 编号
|
||||
"""
|
||||
|
||||
if not os.path.exists(file_path):
|
||||
raise FileNotFoundError(f"File not found: {file_path}")
|
||||
|
||||
# 获取驱动器字母
|
||||
drive_letter = os.path.splitdrive(file_path)[0][0]
|
||||
device = f"\\\\.\\{drive_letter}:"
|
||||
|
||||
# print(f"Opening device: {device}")
|
||||
|
||||
try:
|
||||
img = pytsk3.Img_Info(device)
|
||||
fs = pytsk3.FS_Info(img)
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to open device '{device}': {e}")
|
||||
|
||||
# 构建相对路径
|
||||
abs_path = os.path.abspath(file_path)
|
||||
root_path = f"{drive_letter}:\\"
|
||||
rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\")
|
||||
|
||||
# print(f"Looking up MFT entry for: {rel_path}")
|
||||
|
||||
mft_entry = find_file_mft_entry(fs, rel_path)
|
||||
# print(f"MFT Entry: {mft_entry}")
|
||||
if mft_entry is None:
|
||||
raise RuntimeError("Could not find MFT entry for the specified file.")
|
||||
|
||||
return mft_entry
|
||||
|
||||
|
||||
def CalculateFileMftStartSector(mft_entry, volume_letter="Z"):
|
||||
"""
|
||||
根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号
|
||||
|
||||
参数:
|
||||
mft_entry (int): 文件的 MFT Entry 编号(即 inode)
|
||||
mft_start_sector (int): $MFT 的起始扇区号,默认 6291456
|
||||
mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024
|
||||
bytes_per_sector (int): 每扇区字节数,默认 512
|
||||
|
||||
返回:
|
||||
int: 文件 MFT Entry 的起始扇区号
|
||||
"""
|
||||
if mft_entry < 0:
|
||||
raise ValueError("MFT Entry 编号不能为负数")
|
||||
|
||||
# 获取 NTFS 引导信息
|
||||
config_data = GetNTFSBootInfo(volume_letter)
|
||||
# 计算文件 MFT Entry 的起始扇区号
|
||||
start_sector = config_data["MftPosition"] * 8 + mft_entry * 2
|
||||
if start_sector < 0:
|
||||
raise ValueError("起始扇区号不能为负数")
|
||||
# print(f"文件 MFT Entry 的起始扇区号: {start_sector}")
|
||||
return start_sector
|
||||
|
||||
|
||||
def Get80hPattern(sector_number, volume_letter="Z"):
|
||||
"""
|
||||
读取NTFS扇区并查找特定模式的数据
|
||||
|
||||
参数:
|
||||
sector_number (int): 要读取的扇区号
|
||||
drive_path (str): 磁盘设备路径,默认为Z盘
|
||||
|
||||
返回:
|
||||
list: 包含所有匹配信息的列表,每个元素为:
|
||||
{
|
||||
'start_byte': 文件MFT Entry的起始字节位置(StartSector * 512),
|
||||
'offset': 当前80属性在扇区内的偏移位置,
|
||||
'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ..."),
|
||||
'is_resident': 是否为常驻属性,
|
||||
'total_groups': 实际读取的组数,
|
||||
'attribute_length': 属性总长度(字节)
|
||||
}
|
||||
"""
|
||||
drive_path = fr"\\.\{volume_letter}:"
|
||||
SECTOR_SIZE = 512
|
||||
GROUP_SIZE = 8 # 每组8字节
|
||||
MATCH_BYTE = 0x80 # 要匹配的起始字节
|
||||
results = []
|
||||
|
||||
try:
|
||||
with open(drive_path, 'rb') as disk:
|
||||
disk.seek(sector_number * SECTOR_SIZE)
|
||||
sector_data = disk.read(SECTOR_SIZE)
|
||||
|
||||
if not sector_data or len(sector_data) < GROUP_SIZE:
|
||||
print(f"错误: 无法读取扇区 {sector_number}")
|
||||
return results
|
||||
|
||||
groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)]
|
||||
|
||||
for i in range(len(groups)):
|
||||
current_group = groups[i]
|
||||
|
||||
if len(current_group) < GROUP_SIZE:
|
||||
continue
|
||||
|
||||
if current_group[0] == MATCH_BYTE:
|
||||
# 获取第5~8字节作为属性长度(小端DWORD)
|
||||
if i + 1 >= len(groups):
|
||||
print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h")
|
||||
continue
|
||||
|
||||
attribute_length_bytes = b''.join([
|
||||
groups[i][4:8], # 第一组的4~7字节
|
||||
groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00'
|
||||
])
|
||||
|
||||
attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little')
|
||||
|
||||
# 计算要读取的组数(向上取整到8字节)
|
||||
total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE
|
||||
|
||||
end_idx = i + total_groups
|
||||
if end_idx > len(groups):
|
||||
print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h")
|
||||
continue
|
||||
|
||||
raw_sequence = groups[i:end_idx]
|
||||
|
||||
# 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00"
|
||||
formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence]
|
||||
|
||||
# 判断是否为常驻属性(查看第2个组第一个字节最低位)
|
||||
is_resident = False
|
||||
if len(raw_sequence) >= 2:
|
||||
second_group = raw_sequence[1]
|
||||
is_resident = (second_group[0] & 0x01) == 0x00
|
||||
|
||||
result_entry = {
|
||||
'start_byte': sector_number * SECTOR_SIZE, # 新增字段:文件MFT Entry的起始字节位置
|
||||
'offset': i * GROUP_SIZE,
|
||||
'sequence': formatted_sequence,
|
||||
'is_resident': is_resident,
|
||||
'total_groups': total_groups,
|
||||
'attribute_length': attribute_length
|
||||
}
|
||||
|
||||
results.append(result_entry)
|
||||
|
||||
# resident_str = "常驻" if is_resident else "非常驻"
|
||||
# print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:")
|
||||
# print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:")
|
||||
# for j, group in enumerate(formatted_sequence):
|
||||
# print(f"组 {j + 1}: {group}")
|
||||
#
|
||||
# print(f"\n共找到 {len(results)} 个匹配序列")
|
||||
|
||||
return results
|
||||
|
||||
except PermissionError:
|
||||
print("错误: 需要管理员权限访问磁盘设备")
|
||||
except Exception as e:
|
||||
print(f"发生错误: {str(e)}")
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def GetFile80hPattern(file_path):
|
||||
volume_letter = file_path.split(':')[0]
|
||||
try:
|
||||
mft_entry_value = GetFileMftEntry(file_path)
|
||||
StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter)
|
||||
# print(f"文件的相关信息以及80属性内容:")
|
||||
# print(Get80hPattern(StartSector, volume_letter))
|
||||
file80h_pattern = Get80hPattern(StartSector, volume_letter)
|
||||
return file80h_pattern
|
||||
except Exception as e:
|
||||
print(f"❌ Error: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# if __name__ == '__main__':
|
||||
# data = GetFile80hPattern(r"Z:\hello.txt")
|
||||
# print(data)
|
||||
|
||||
|
||||
def ExtractSequenceHexValues(file80h_pattern):
|
||||
"""
|
||||
从给定的数据结构中提取所有 sequence 的十六进制字符串,并合并成一个标准列表
|
||||
|
||||
参数:
|
||||
data (list): 包含字典的列表,每个字典有 'sequence' 键
|
||||
|
||||
返回:
|
||||
list: 包含所有 sequence 值的合并列表
|
||||
"""
|
||||
sequence_list = []
|
||||
for entry in file80h_pattern:
|
||||
if 'sequence' in entry:
|
||||
# 将每个十六进制字符串按空格分割,然后合并到结果列表
|
||||
for hex_str in entry['sequence']:
|
||||
# 分割字符串并添加到结果
|
||||
sequence_list.extend(hex_str.split())
|
||||
return sequence_list
|
||||
|
||||
|
||||
def ExportDataRunList(data_run_list):
|
||||
"""
|
||||
将 data_run_list 拆分成多个独立的 Data Run 片段。
|
||||
"""
|
||||
result = []
|
||||
pos = 0
|
||||
while pos < len(data_run_list):
|
||||
current_byte = data_run_list[pos]
|
||||
if current_byte == '00':
|
||||
break
|
||||
try:
|
||||
header = int(current_byte, 16)
|
||||
len_bytes = (header >> 4) & 0x0F
|
||||
offset_bytes = header & 0x0F
|
||||
|
||||
run_length = 1 + offset_bytes + len_bytes
|
||||
if pos + run_length > len(data_run_list):
|
||||
print(f"⚠️ 数据越界,停止解析")
|
||||
break
|
||||
|
||||
fragment = data_run_list[pos: pos + run_length]
|
||||
result.append(fragment)
|
||||
pos += run_length
|
||||
except Exception as e:
|
||||
print(f"❌ 解析 Data Run 失败:位置 {pos}, 错误: {e}")
|
||||
pos += 1 # 跳过一个字节继续解析
|
||||
return result
|
||||
|
||||
|
||||
def hex_list_to_int(lst, byteorder='little'):
|
||||
"""
|
||||
将十六进制字符串列表转换为整数(支持小端序)
|
||||
"""
|
||||
if byteorder == 'little':
|
||||
lst = list(reversed(lst))
|
||||
return int(''.join(f"{int(b, 16):02x}" for b in lst), 16)
|
||||
|
||||
|
||||
def parse_data_run(data_run, previous_cluster=0, cluster_size=512):
|
||||
"""
|
||||
解析 NTFS 单个 Data Run,返回起始字节、结束字节、长度(字节)
|
||||
|
||||
参数:
|
||||
data_run (list): Data Run 的十六进制字符串列表
|
||||
previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移)
|
||||
cluster_size (int): 簇大小(默认为 512 字节)
|
||||
|
||||
返回:
|
||||
dict: 包含起始字节、结束字节、长度等信息
|
||||
"""
|
||||
if not data_run or data_run[0] == '00':
|
||||
return None
|
||||
|
||||
header = int(data_run[0], 16)
|
||||
len_bytes = (header >> 4) & 0x0F
|
||||
offset_bytes = header & 0x0F
|
||||
|
||||
if len(data_run) < 1 + offset_bytes + len_bytes:
|
||||
print(f"⚠️ 数据长度不足,无法解析 Data Run")
|
||||
return None
|
||||
|
||||
# 提取偏移字段和长度字段
|
||||
offset_data = data_run[1:1 + offset_bytes]
|
||||
length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes]
|
||||
|
||||
# 小端序转整数
|
||||
def hex_list_to_int(lst):
|
||||
return int(''.join(f"{int(b, 16):02x}" for b in reversed(lst)), 16)
|
||||
|
||||
offset = hex_list_to_int(offset_data)
|
||||
run_length = hex_list_to_int(length_data)
|
||||
|
||||
# 计算起始簇号
|
||||
starting_cluster = previous_cluster + offset
|
||||
ending_cluster = starting_cluster + run_length - 1
|
||||
|
||||
# 转换为字节偏移
|
||||
cluster_per_sector = 8
|
||||
byte_per_sector = cluster_size
|
||||
byte_length = starting_cluster * cluster_per_sector * byte_per_sector
|
||||
starting_byte = run_length * cluster_per_sector * byte_per_sector
|
||||
ending_byte = starting_byte + byte_length - 1
|
||||
|
||||
return {
|
||||
"starting_byte": starting_byte,
|
||||
"ending_byte": ending_byte,
|
||||
"byte_length": byte_length,
|
||||
"starting_cluster": starting_cluster,
|
||||
"run_length_clusters": run_length
|
||||
}
|
||||
|
||||
|
||||
def ParseMultipleDataRuns(fragments, cluster_size=512):
|
||||
"""
|
||||
批量解析多个 Data Run 片段,返回字节偏移信息。
|
||||
|
||||
参数:
|
||||
fragments (list): 多个 Data Run 字符串列表
|
||||
cluster_size (int): 簇大小(默认为 512)
|
||||
|
||||
返回:
|
||||
list: 每个元素是一个包含字节偏移信息的 dict
|
||||
"""
|
||||
results = []
|
||||
previous_starting_cluster = 0
|
||||
|
||||
for fragment in fragments:
|
||||
result = parse_data_run(fragment, previous_starting_cluster, cluster_size)
|
||||
|
||||
if result:
|
||||
results.append(result)
|
||||
previous_starting_cluster = result["starting_cluster"]
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def GetFragmentData(file80h_pattern):
|
||||
if not file80h_pattern or not isinstance(file80h_pattern, list):
|
||||
return []
|
||||
|
||||
if file80h_pattern[0].get('is_resident'):
|
||||
start_byte = file80h_pattern[0].get('start_byte')
|
||||
offset = file80h_pattern[0].get('offset')
|
||||
content_start = file80h_pattern[0].get('sequence')[2]
|
||||
|
||||
content_start_list = content_start.split()
|
||||
content_len = content_start_list[::-1][4:8]
|
||||
content_offset = content_start_list[::-1][:4]
|
||||
|
||||
content_len_str = ''.join(content_len)
|
||||
content_len_decimal_value = int(content_len_str, 16)
|
||||
content_offset_str = ''.join(content_offset)
|
||||
content_offset_decimal_value = int(content_offset_str, 16)
|
||||
|
||||
file_offset = start_byte + offset + content_offset_decimal_value
|
||||
|
||||
return [{
|
||||
'starting_byte': file_offset,
|
||||
'byte_length': content_len_decimal_value
|
||||
}]
|
||||
|
||||
else:
|
||||
sequence_list = ExtractSequenceHexValues(file80h_pattern)
|
||||
data_run_offset = sequence_list[32:34][::-1]
|
||||
data_run_offset_str = ''.join(data_run_offset)
|
||||
data_run_offset_decimal_value = int(data_run_offset_str, 16)
|
||||
data_run_list = sequence_list[data_run_offset_decimal_value:]
|
||||
fragments = ExportDataRunList(data_run_list)
|
||||
results = ParseMultipleDataRuns(fragments)
|
||||
return results
|
||||
|
||||
# if __name__ == '__main__':
|
||||
# arri80_data = GetFile80hPattern(r"Z:\hello.txt")
|
||||
# data = GetFragmentData(arri80_data)
|
||||
# print(data)
|
Binary file not shown.
139
test/export_useful_fragments.py
Normal file
139
test/export_useful_fragments.py
Normal file
@@ -0,0 +1,139 @@
|
||||
def extract_data_run_fragments(data_run):
|
||||
"""
|
||||
将 data_run 中的多个 Data Run 提取为独立的 list 片段。
|
||||
|
||||
参数:
|
||||
data_run (list): 十六进制字符串组成的列表,表示 Data Run 内容
|
||||
|
||||
返回:
|
||||
list: 每个元素是一个代表单个 Data Run 的 list
|
||||
"""
|
||||
result = []
|
||||
pos = 0
|
||||
|
||||
while pos < len(data_run):
|
||||
current_byte = data_run[pos]
|
||||
|
||||
if current_byte == '00':
|
||||
# 遇到空运行块,停止解析
|
||||
break
|
||||
|
||||
try:
|
||||
header = int(current_byte, 16)
|
||||
len_bytes = (header >> 4) & 0x0F
|
||||
offset_bytes = header & 0x0F
|
||||
|
||||
if len_bytes == 0 or offset_bytes == 0:
|
||||
print(f"⚠️ 无效的字段长度,跳过位置 {pos}")
|
||||
break
|
||||
|
||||
# 计算当前 Data Run 总长度
|
||||
run_length = 1 + offset_bytes + len_bytes
|
||||
|
||||
# 截取当前 Data Run
|
||||
fragment = data_run[pos: pos + run_length]
|
||||
|
||||
result.append(fragment)
|
||||
|
||||
# 移动指针
|
||||
pos += run_length
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 解析失败,位置 {pos}:{e}")
|
||||
break
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def hex_list_to_int(lst, byteorder='little'):
|
||||
"""
|
||||
将十六进制字符串列表转换为整数(支持小端序)
|
||||
"""
|
||||
if byteorder == 'little':
|
||||
lst = list(reversed(lst))
|
||||
return int(''.join(f"{int(b, 16):02x}" for b in lst), 16)
|
||||
|
||||
|
||||
def parse_data_run(data_run, previous_cluster=0):
|
||||
"""
|
||||
解析 NTFS 单个 Data Run,返回起始簇号和结束簇号
|
||||
|
||||
参数:
|
||||
data_run (list): Data Run 的十六进制字符串列表
|
||||
previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移)
|
||||
|
||||
返回:
|
||||
dict: 包含起始簇、结束簇、运行长度等信息
|
||||
"""
|
||||
if not data_run or data_run[0] == '00':
|
||||
return None
|
||||
|
||||
header = int(data_run[0], 16)
|
||||
len_bytes = (header >> 4) & 0x0F
|
||||
offset_bytes = header & 0x0F
|
||||
|
||||
# 提取偏移字段和长度字段(注意顺序是先偏移后长度)
|
||||
offset_data = data_run[1:1 + offset_bytes]
|
||||
length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes]
|
||||
|
||||
# 解析偏移和长度
|
||||
offset = hex_list_to_int(offset_data, 'little')
|
||||
run_length = hex_list_to_int(length_data, 'little')
|
||||
|
||||
# 计算起始簇号(如果是第一个就是绝对偏移,否则是相对偏移)
|
||||
starting_cluster = previous_cluster + offset
|
||||
ending_cluster = starting_cluster + run_length - 1
|
||||
|
||||
return {
|
||||
"starting_cluster": starting_cluster,
|
||||
"ending_cluster": ending_cluster,
|
||||
"run_length": run_length
|
||||
}
|
||||
|
||||
|
||||
def parse_multiple_data_runs(fragments):
|
||||
"""
|
||||
批量解析多个 Data Run 片段,支持相对偏移。
|
||||
|
||||
参数:
|
||||
fragments (list): 多个 Data Run 字符串列表,如:
|
||||
[
|
||||
['31', '7a', '00', 'ee', '0b'],
|
||||
['22', '29', '06', 'bb', '00'],
|
||||
...
|
||||
]
|
||||
|
||||
返回:
|
||||
list: 每个元素是一个 dict,包含该片段的解析结果
|
||||
"""
|
||||
results = []
|
||||
previous_starting_cluster = 0
|
||||
|
||||
for fragment in fragments:
|
||||
result = parse_data_run(fragment, previous_starting_cluster)
|
||||
|
||||
if result:
|
||||
results.append(result)
|
||||
previous_starting_cluster = result["starting_cluster"]
|
||||
|
||||
return results
|
||||
|
||||
|
||||
data_run = [
|
||||
'31', '7a', '00', 'ee', '0b',
|
||||
'22', '29', '06', 'bb', '00',
|
||||
'32', '7a', '02', 'ee', '00', '00',
|
||||
'00', 'a0', 'f8', 'ff', 'ff', 'ff', 'ff', 'ff'
|
||||
]
|
||||
|
||||
# Step 1: 提取所有有效片段
|
||||
fragments = extract_data_run_fragments(data_run)
|
||||
print("提取到的片段:")
|
||||
for i, frag in enumerate(fragments):
|
||||
print(f"片段{i + 1}: {frag}")
|
||||
|
||||
# Step 2: 批量解析这些片段
|
||||
results = parse_multiple_data_runs(fragments)
|
||||
print("\n解析结果:")
|
||||
for i, res in enumerate(results):
|
||||
print(f"片段{i + 1}: {res}")
|
36
test/fake_main.py
Normal file
36
test/fake_main.py
Normal file
@@ -0,0 +1,36 @@
|
||||
from files_save import CopyMultiFragmentFiles, CopySingleFragmentFiles
|
||||
|
||||
target_path = r"Z:\Recovered"
|
||||
# 存储各个文件的分片内容列表
|
||||
fragment_lists = {}
|
||||
test_file_sort = [{'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'filename': 'AGA - 一.mp3', 'extent_count': 2,
|
||||
'start_byte': 23162880, 'length': 69632, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3',
|
||||
'filename': 'Aaron Zigman - Main Title.mp3', 'extent_count': 1, 'start_byte': 687685632,
|
||||
'length': 7163904, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\AGA - MIZU.mp3', 'filename': 'AGA - MIZU.mp3', 'extent_count': 1,
|
||||
'start_byte': 694849536, 'length': 8126464, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'filename': 'AGA - 一.mp3', 'extent_count': 2,
|
||||
'start_byte': 702976000, 'length': 10870784, 'fragment_index': 2},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\Anson Seabra - Keep Your Head Up Princess.mp3',
|
||||
'filename': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'extent_count': 1,
|
||||
'start_byte': 713846784, 'length': 7970816, 'fragment_index': 1}, {
|
||||
'absolute_path': 'Y:\\CloudMusic\\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
|
||||
'filename': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
|
||||
'extent_count': 1, 'start_byte': 721817600, 'length': 9179136, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\Ava Max - Sweet but Psycho.mp3',
|
||||
'filename': 'Ava Max - Sweet but Psycho.mp3', 'extent_count': 1, 'start_byte': 731000832,
|
||||
'length': 7938048, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3',
|
||||
'filename': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'extent_count': 1,
|
||||
'start_byte': 738938880, 'length': 6791168, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\Color Music Choir - Something Just Like This (Live).mp3',
|
||||
'filename': 'Color Music Choir - Something Just Like This (Live).mp3', 'extent_count': 1,
|
||||
'start_byte': 745730048, 'length': 6193152, 'fragment_index': 1}]
|
||||
|
||||
for item in test_file_sort:
|
||||
extent_count = item['extent_count']
|
||||
if extent_count == 1:
|
||||
CopySingleFragmentFiles(item, target_path)
|
||||
elif extent_count > 1:
|
||||
CopyMultiFragmentFiles(item, fragment_lists, target_path)
|
160
test/files_save.py
Normal file
160
test/files_save.py
Normal file
@@ -0,0 +1,160 @@
|
||||
import os
|
||||
|
||||
|
||||
def ExtractVolumeLetter(path: str) -> str:
|
||||
"""从绝对路径中提取盘符"""
|
||||
drive = os.path.splitdrive(path)[0]
|
||||
if not drive:
|
||||
raise ValueError(f"无法从路径中提取盘符:{path}")
|
||||
return drive[0].upper() # 返回 'Y'
|
||||
|
||||
|
||||
def CopySingleFragmentFiles(source_data_dict, target_path):
|
||||
"""
|
||||
根据起始字节和长度,从磁盘中读取数据并保存为目标文件
|
||||
|
||||
:param source_data_dict: 包含源数据信息的字典
|
||||
:param target_path: 目标文件夹路径
|
||||
"""
|
||||
start_byte = source_data_dict.get("start_byte")
|
||||
byte_length = source_data_dict.get("length")
|
||||
absolute_path = source_data_dict.get("absolute_path")
|
||||
file_name = source_data_dict.get("filename")
|
||||
|
||||
if byte_length <= 0:
|
||||
print("错误:字节长度无效")
|
||||
return
|
||||
|
||||
if not absolute_path or not file_name:
|
||||
print("错误:缺少必要的文件信息")
|
||||
return
|
||||
|
||||
source_disk_path = ExtractVolumeLetter(absolute_path)
|
||||
target_file_path = os.path.join(target_path, file_name)
|
||||
|
||||
try:
|
||||
# 创建目标目录(如果不存在)
|
||||
os.makedirs(target_path, exist_ok=True)
|
||||
|
||||
with open(fr"\\.\{source_disk_path}:", 'rb') as disk:
|
||||
disk.seek(start_byte)
|
||||
|
||||
with open(target_file_path, 'wb') as f:
|
||||
remaining = byte_length
|
||||
CHUNK_SIZE = 1024 * 1024 # 1MB
|
||||
while remaining > 0:
|
||||
read_size = min(CHUNK_SIZE, remaining)
|
||||
chunk = disk.read(read_size)
|
||||
if not chunk:
|
||||
print("警告:读取到空数据,可能已到达磁盘末尾。")
|
||||
break
|
||||
f.write(chunk)
|
||||
remaining -= len(chunk)
|
||||
|
||||
print(
|
||||
f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}")
|
||||
|
||||
except PermissionError:
|
||||
print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序")
|
||||
except Exception as e:
|
||||
print(f"发生错误: {str(e)}")
|
||||
|
||||
|
||||
def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes:
|
||||
"""
|
||||
从指定磁盘的指定起始位置读取指定长度的字节。
|
||||
|
||||
:param volume_letter: 盘符(如 "Y")
|
||||
:param start_byte: 起始字节位置(整数)
|
||||
:param length: 要读取的字节数(整数)
|
||||
:return: 读取到的原始字节数据(bytes)
|
||||
"""
|
||||
if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1:
|
||||
raise ValueError("drive_letter 必须是单个字母,如 'Y'")
|
||||
|
||||
# 构建 Windows 设备路径格式:\\.\Y:
|
||||
disk_path = f"\\\\.\\{volume_letter.strip().upper()}:"
|
||||
|
||||
try:
|
||||
with open(disk_path, "rb") as disk:
|
||||
disk.seek(start_byte)
|
||||
data = disk.read(length)
|
||||
return data
|
||||
except PermissionError:
|
||||
raise PermissionError("权限不足,请以管理员身份运行程序")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"读取磁盘失败:{e}")
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# drive = "Y"
|
||||
# start = 687685632
|
||||
# size = 7163904
|
||||
#
|
||||
# try:
|
||||
# content = ReadDiskBytes(drive, start, size)
|
||||
# print(f"成功读取 {len(content)} 字节内容。前100字节为:")
|
||||
# print(content[:100])
|
||||
# except Exception as e:
|
||||
# print("错误:", e)
|
||||
|
||||
|
||||
def CopyMultiFragmentFiles(
|
||||
item: dict,
|
||||
fragment_lists: dict,
|
||||
target_path: str
|
||||
):
|
||||
"""
|
||||
处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。
|
||||
|
||||
:param item: 包含文件分片信息的字典
|
||||
:param fragment_lists: 存储各文件分片内容的字典
|
||||
:param target_path: 恢复文件的目标保存路径
|
||||
:return: None
|
||||
"""
|
||||
file_name = item['filename']
|
||||
extent_count = item['extent_count']
|
||||
fragment_index = item['fragment_index']
|
||||
start_byte = item['start_byte']
|
||||
length_byte = item['length']
|
||||
|
||||
volume_letter = ExtractVolumeLetter(item['absolute_path'])
|
||||
|
||||
# 读取分片内容
|
||||
fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte)
|
||||
|
||||
# 如果还没有为这个文件创建列表,则初始化
|
||||
if file_name not in fragment_lists:
|
||||
fragment_lists[file_name] = [None] * extent_count
|
||||
|
||||
# 将内容插入到指定位置
|
||||
if fragment_index <= extent_count:
|
||||
fragment_lists[file_name][fragment_index - 1] = fragment_content
|
||||
print(f"已写入 {file_name} 的第 {fragment_index} 个片段。")
|
||||
else:
|
||||
print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}")
|
||||
|
||||
# 检查是否所有分片都已加载
|
||||
fragments = fragment_lists[file_name]
|
||||
if None not in fragments:
|
||||
full_content = b''.join(fragments)
|
||||
target_file_path = os.path.join(target_path, file_name)
|
||||
try:
|
||||
with open(target_file_path, 'wb') as f:
|
||||
f.write(full_content)
|
||||
print(f"已成功恢复文件:{file_name}")
|
||||
except Exception as e:
|
||||
print(f"写入文件失败:{file_name},错误:{e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_dict = {
|
||||
'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3',
|
||||
'filename': 'Aaron Zigman - Main Title.mp3',
|
||||
'extent_count': 1,
|
||||
'start_byte': 687685632,
|
||||
'length': 7163904,
|
||||
'fragment_index': 1
|
||||
}
|
||||
|
||||
CopySingleFragmentFiles(test_dict, target_path=r"Z:\RecoveredFiles")
|
232
test/files_sort.py
Normal file
232
test/files_sort.py
Normal file
@@ -0,0 +1,232 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path",
|
||||
files_path=None) -> list:
|
||||
"""
|
||||
根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。
|
||||
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 要查询的数据表名称
|
||||
:param files_path: 文件的完整路径列表
|
||||
:return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str}
|
||||
"""
|
||||
if files_path is None:
|
||||
file_paths = []
|
||||
results = []
|
||||
|
||||
# 连接数据库
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
for path in files_path:
|
||||
try:
|
||||
# 使用字符串格式化插入表名,参数化查询只适用于值
|
||||
sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?"
|
||||
cursor.execute(sql, (path,))
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
results.append({
|
||||
'absolute_path': path,
|
||||
'id': row[0],
|
||||
'name': row[1]
|
||||
})
|
||||
else:
|
||||
print(f"未找到匹配记录:{path}")
|
||||
except Exception as e:
|
||||
print(f"查询失败:{path},错误:{e}")
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# test_files = [
|
||||
# r"CloudMusic/AGA - MIZU.mp3",
|
||||
# r"CloudMusic/AGA - 一.mp3",
|
||||
# r"CloudMusic/Aaron Zigman - Main Title.mp3",
|
||||
# r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
|
||||
# r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
|
||||
# r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
|
||||
# r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
|
||||
# r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
|
||||
# ]
|
||||
#
|
||||
# result = GetFilesDBPathInfo(files_path=test_files)
|
||||
# for item in result:
|
||||
# print(item)
|
||||
|
||||
|
||||
def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node",
|
||||
path_records: list = None) -> list:
|
||||
"""
|
||||
根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。
|
||||
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: db_node 表名
|
||||
:param path_records: 来自 get_db_path_info 的结果列表
|
||||
:return: 包含文件分片信息的结果列表
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
results = []
|
||||
|
||||
for record in path_records:
|
||||
path_id = record['id']
|
||||
absolute_path = record['absolute_path']
|
||||
name = record['name']
|
||||
|
||||
try:
|
||||
# 查询 db_node 表中 PathID 对应的记录
|
||||
cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,))
|
||||
row = cursor.fetchone()
|
||||
|
||||
if not row:
|
||||
print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录")
|
||||
continue
|
||||
|
||||
# 获取字段索引(适用于按列名获取)
|
||||
columns = [desc[0] for desc in cursor.description]
|
||||
|
||||
# 构建字典以便按列名访问
|
||||
node_data = dict(zip(columns, row))
|
||||
|
||||
# 获取 ExtentCount
|
||||
extent_count = node_data.get("ExtentCount", 0)
|
||||
|
||||
# 解析分片信息
|
||||
fragments = []
|
||||
for i in range(1, 5): # extent1 ~ extent4
|
||||
loc = node_data.get(f"extent{i}_Location")
|
||||
length = node_data.get(f"extent{i}_Length")
|
||||
|
||||
if loc is not None and length is not None and length > 0:
|
||||
fragments.append({
|
||||
"start_byte": loc,
|
||||
"length": length
|
||||
})
|
||||
|
||||
results.append({
|
||||
"absolute_path": absolute_path,
|
||||
"name": name,
|
||||
"path_id": path_id,
|
||||
"extent_count": extent_count,
|
||||
"fragments": fragments
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
print(f"查询失败:PathID={path_id}, 错误:{e}")
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_files = [
|
||||
r"CloudMusic/AGA - MIZU.mp3",
|
||||
r"CloudMusic/AGA - 一.mp3",
|
||||
r"CloudMusic/Aaron Zigman - Main Title.mp3",
|
||||
r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
|
||||
r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
|
||||
r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
|
||||
r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
|
||||
r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
|
||||
]
|
||||
|
||||
# 第一步:获取 db_path 表中的 ID 和 Name
|
||||
path_info = GetFilesDBPathInfo(files_path=test_files)
|
||||
|
||||
# 第二步:根据 PathID 查询 db_node 表中的分片信息
|
||||
file_extents_info = GetFilesDBNodeInfo(path_records=path_info)
|
||||
|
||||
# 打印结果
|
||||
for item in file_extents_info:
|
||||
print(item)
|
||||
|
||||
|
||||
def sort_fragments_by_start_byte(file_extents_list: list) -> list:
|
||||
"""
|
||||
对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。
|
||||
|
||||
:param file_extents_list: get_file_extents_info 返回的结果列表
|
||||
:return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息
|
||||
"""
|
||||
all_fragments = []
|
||||
|
||||
for file_info in file_extents_list:
|
||||
absolute_path = file_info['absolute_path']
|
||||
filename = file_info['name']
|
||||
extent_count = file_info['extent_count']
|
||||
fragments = file_info['fragments']
|
||||
|
||||
# 对当前文件的片段排序(虽然通常已经是有序的)
|
||||
sorted_fragments = sorted(fragments, key=lambda x: x['start_byte'])
|
||||
|
||||
# 添加片段索引信息
|
||||
for idx, fragment in enumerate(sorted_fragments, start=1):
|
||||
all_fragments.append({
|
||||
'absolute_path': absolute_path,
|
||||
'filename': filename,
|
||||
'extent_count': extent_count,
|
||||
'start_byte': fragment['start_byte'],
|
||||
'length': fragment['length'],
|
||||
'fragment_index': idx
|
||||
})
|
||||
|
||||
# 全局排序:按 start_byte 排序所有片段
|
||||
all_fragments.sort(key=lambda x: x['start_byte'])
|
||||
|
||||
return all_fragments
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# test_files = [
|
||||
# r"CloudMusic/AGA - MIZU.mp3",
|
||||
# r"CloudMusic/AGA - 一.mp3",
|
||||
# r"CloudMusic/Aaron Zigman - Main Title.mp3",
|
||||
# r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
|
||||
# r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
|
||||
# r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
|
||||
# r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
|
||||
# r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
|
||||
# ]
|
||||
# test_files_sort = [
|
||||
# {'absolute_path': 'CloudMusic/AGA - MIZU.mp3', 'name': 'AGA - MIZU.mp3', 'path_id': 6, 'extent_count': 1,
|
||||
# 'fragments': [{'start_byte': 694849536, 'length': 8126464}]},
|
||||
# {'absolute_path': 'CloudMusic/AGA - 一.mp3', 'name': 'AGA - 一.mp3', 'path_id': 7, 'extent_count': 2,
|
||||
# 'fragments': [{'start_byte': 702976000, 'length': 10870784}, {'start_byte': 23162880, 'length': 69632}]},
|
||||
# {'absolute_path': 'CloudMusic/Aaron Zigman - Main Title.mp3', 'name': 'Aaron Zigman - Main Title.mp3',
|
||||
# 'path_id': 5, 'extent_count': 1, 'fragments': [{'start_byte': 687685632, 'length': 7163904}]},
|
||||
# {'absolute_path': 'CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3',
|
||||
# 'name': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'path_id': 8, 'extent_count': 1,
|
||||
# 'fragments': [{'start_byte': 713846784, 'length': 7970816}]},
|
||||
# {'absolute_path': 'CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
|
||||
# 'name': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', 'path_id': 9,
|
||||
# 'extent_count': 1, 'fragments': [{'start_byte': 721817600, 'length': 9179136}]},
|
||||
# {'absolute_path': 'CloudMusic/Ava Max - Sweet but Psycho.mp3', 'name': 'Ava Max - Sweet but Psycho.mp3',
|
||||
# 'path_id': 10, 'extent_count': 1, 'fragments': [{'start_byte': 731000832, 'length': 7938048}]},
|
||||
# {'absolute_path': 'CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3',
|
||||
# 'name': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'path_id': 11, 'extent_count': 1,
|
||||
# 'fragments': [{'start_byte': 738938880, 'length': 6791168}]},
|
||||
# {'absolute_path': 'CloudMusic/Color Music Choir - Something Just Like This (Live).mp3',
|
||||
# 'name': 'Color Music Choir - Something Just Like This (Live).mp3', 'path_id': 12, 'extent_count': 1,
|
||||
# 'fragments': [{'start_byte': 745730048, 'length': 6193152}]}]
|
||||
#
|
||||
# path_info = GetFilesDBPathInfo(files_path=test_files)
|
||||
# file_extents_data = GetFilesDBNodeInfo(path_records=path_info)
|
||||
#
|
||||
# # 根据文件片段先后排序
|
||||
# single_fragment_result = sort_fragments_by_start_byte(file_extents_data)
|
||||
#
|
||||
# # 模拟多文件片段,根据文件片段先后排序
|
||||
# multi_fragment_result = sort_fragments_by_start_byte(test_files_sort)
|
||||
#
|
||||
# print("单文件片段排序结果:")
|
||||
# for item in single_fragment_result:
|
||||
# print(item)
|
||||
#
|
||||
# print("\n多文件片段排序结果:")
|
||||
# for item in multi_fragment_result:
|
||||
# print(item)
|
199
test/folders_sort.py
Normal file
199
test/folders_sort.py
Normal file
@@ -0,0 +1,199 @@
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
from files_sort import GetFilesDBNodeInfo, sort_fragments_by_start_byte
|
||||
|
||||
|
||||
def GetFolderID(
|
||||
folder_path: str,
|
||||
db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path"
|
||||
) -> int | None:
|
||||
"""
|
||||
根据文件夹路径,查询数据库中该文件夹对应的 ID。
|
||||
|
||||
:param folder_path: 文件夹路径(如 r"CloudMusic\\")
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 要查询的数据表名称,默认为 'db_path'
|
||||
:return: 成功则返回 ID(int),失败返回 None
|
||||
"""
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 使用 table_name 构建 SQL 查询
|
||||
sql = f"SELECT ID FROM {table_name} WHERE Path = ?"
|
||||
cursor.execute(sql, (folder_path,))
|
||||
result = cursor.fetchone()
|
||||
|
||||
if result:
|
||||
return result[0]
|
||||
else:
|
||||
print(f"未找到路径:{folder_path} 在表 {table_name} 中")
|
||||
return None
|
||||
|
||||
except sqlite3.Error as e:
|
||||
print(f"数据库操作失败:{e}")
|
||||
return None
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def GetSubPathsByParentID(
|
||||
parent_id: int,
|
||||
db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path"
|
||||
) -> list:
|
||||
"""
|
||||
根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。
|
||||
|
||||
:param parent_id: 父节点 ID
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 数据表名称
|
||||
:return: 包含 ID、Path、Name 的字典列表
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
sql = f"""
|
||||
SELECT ID, Path, Name
|
||||
FROM {table_name}
|
||||
WHERE ParentID = ?
|
||||
"""
|
||||
|
||||
try:
|
||||
cursor.execute(sql, (parent_id,))
|
||||
rows = cursor.fetchall()
|
||||
except Exception as e:
|
||||
print(f"数据库查询失败:{e}")
|
||||
return []
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
item = {
|
||||
'id': row[0],
|
||||
'absolute_path': row[1],
|
||||
'name': row[2]
|
||||
}
|
||||
results.append(item)
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_folder_path = "pictures/"
|
||||
parent_id_test = GetFolderID(test_folder_path)
|
||||
# node_data = GetNodeFragmentsByParentID(parent_id_test)
|
||||
path_data = GetSubPathsByParentID(parent_id_test)
|
||||
node_data = GetFilesDBNodeInfo(path_records=path_data)
|
||||
for data in node_data:
|
||||
print(data)
|
||||
|
||||
|
||||
def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list:
|
||||
"""
|
||||
根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。
|
||||
:param db_path: 要查询的数据库
|
||||
:param folder_path: 文件夹的绝对路径
|
||||
:return list: 文件夹下所有文件按片段顺序排列的列表
|
||||
"""
|
||||
parent_id = GetFolderID(folder_path=folder_path, db_path=db_path)
|
||||
path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path)
|
||||
node_data = GetFilesDBNodeInfo(path_records=path_data)
|
||||
result = sort_fragments_by_start_byte(node_data)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# folder_path_test = "pictures/"
|
||||
# data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test)
|
||||
# for item in data:
|
||||
# print(item)
|
||||
|
||||
|
||||
def ScanDirectory(root_dir, skip_system=True):
|
||||
"""
|
||||
递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。
|
||||
|
||||
:param root_dir: 要扫描的根目录路径
|
||||
:param skip_system: 是否跳过系统目录(默认 True)
|
||||
:return: 文件路径列表,格式为 relative/path/to/file.ext
|
||||
"""
|
||||
file_list = []
|
||||
|
||||
for root, dirs, files in os.walk(root_dir):
|
||||
# 跳过系统目录
|
||||
if skip_system:
|
||||
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
|
||||
|
||||
for file in files:
|
||||
full_path = os.path.join(root, file)
|
||||
|
||||
# 去掉盘符
|
||||
_, relative_path = os.path.splitdrive(full_path)
|
||||
|
||||
# 替换 \ 为 /
|
||||
relative_path = relative_path.lstrip("\\").replace("\\", "/")
|
||||
|
||||
file_list.append(relative_path)
|
||||
|
||||
return file_list
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# folder_path = r"Y:/folder1/"
|
||||
# files_list = ScanDirectory(folder_path)
|
||||
#
|
||||
# print(f"共找到 {len(files_list)} 个文件:")
|
||||
# for f in files_list:
|
||||
# print(f)
|
||||
|
||||
|
||||
def ScanMultiFolders(folder_paths, skip_system=True):
|
||||
"""
|
||||
扫描多个根目录,返回所有子目录中的文件路径列表。
|
||||
|
||||
:param folder_paths: 包含多个根目录的列表
|
||||
:param skip_system: 是否跳过系统目录(默认 True)
|
||||
:return: 所有文件的相对路径列表(格式为 folder/file.ext)
|
||||
"""
|
||||
all_files = []
|
||||
|
||||
for root_dir in folder_paths:
|
||||
# 确保路径存在
|
||||
if not os.path.exists(root_dir):
|
||||
print(f"⚠️ 路径不存在:{root_dir}")
|
||||
continue
|
||||
|
||||
for root, dirs, files in os.walk(root_dir):
|
||||
# 跳过系统目录
|
||||
if skip_system:
|
||||
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
|
||||
|
||||
for file in files:
|
||||
full_path = os.path.join(root, file)
|
||||
|
||||
# 去掉盘符
|
||||
_, relative_path = os.path.splitdrive(full_path)
|
||||
relative_path = relative_path.lstrip("\\").replace("\\", "/")
|
||||
|
||||
all_files.append(relative_path)
|
||||
|
||||
return all_files
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
folders = [
|
||||
r"Y:\CloudMusic",
|
||||
r"Y:\folder1"
|
||||
]
|
||||
|
||||
files = ScanMultiFolders(folders)
|
||||
|
||||
print(f"共找到 {len(files)} 个文件:")
|
||||
for f in files:
|
||||
print(f)
|
92
test/get_extent_counts.py
Normal file
92
test/get_extent_counts.py
Normal file
@@ -0,0 +1,92 @@
|
||||
def analyze_ntfs_data_attribute(data):
|
||||
"""
|
||||
分析 NTFS 数据结构中的80属性($DATA),返回文件分片数量
|
||||
|
||||
参数:
|
||||
data (list): 包含字典的列表,每个字典需有'sequence'键
|
||||
(示例结构见问题描述)
|
||||
|
||||
返回:
|
||||
int: 分片数量(常驻属性返回1,非常驻属性返回数据运行的分片数)
|
||||
|
||||
异常:
|
||||
ValueError: 当输入数据无效时抛出
|
||||
"""
|
||||
# 第一步:提取并转换sequence数据
|
||||
hex_bytes = []
|
||||
for entry in data:
|
||||
if 'sequence' in entry:
|
||||
for hex_str in entry['sequence']:
|
||||
hex_bytes.extend(hex_str.split())
|
||||
|
||||
print(hex_bytes)
|
||||
|
||||
# 将十六进制字符串转换为整数列表
|
||||
try:
|
||||
attribute_data = [int(x, 16) for x in hex_bytes]
|
||||
except ValueError:
|
||||
raise ValueError("无效的十六进制数据")
|
||||
|
||||
# 第二步:分析属性结构
|
||||
if len(attribute_data) < 24:
|
||||
raise ValueError("属性数据过短,无法解析头部信息")
|
||||
|
||||
# 检查属性类型(0x80)
|
||||
if attribute_data[0] != 0x80:
|
||||
raise ValueError("不是80属性($DATA属性)")
|
||||
|
||||
# 检查是否常驻(偏移0x08)
|
||||
is_resident = attribute_data[8] == 0
|
||||
|
||||
if is_resident:
|
||||
return 1
|
||||
else:
|
||||
# 解析非常驻属性的数据运行列表
|
||||
data_run_offset = attribute_data[0x20] | (attribute_data[0x21] << 8)
|
||||
|
||||
if data_run_offset >= len(attribute_data):
|
||||
raise ValueError("数据运行偏移超出属性长度")
|
||||
|
||||
data_runs = attribute_data[data_run_offset:]
|
||||
fragment_count = 0
|
||||
pos = 0
|
||||
|
||||
while pos < len(data_runs):
|
||||
header_byte = data_runs[pos]
|
||||
if header_byte == 0x00:
|
||||
break
|
||||
|
||||
len_len = (header_byte >> 4) & 0x0F
|
||||
offset_len = header_byte & 0x0F
|
||||
|
||||
if len_len == 0 or offset_len == 0:
|
||||
break
|
||||
|
||||
pos += 1 + len_len + offset_len
|
||||
fragment_count += 1
|
||||
|
||||
return fragment_count
|
||||
|
||||
|
||||
input_data = [
|
||||
{
|
||||
'start_byte': 3221267456,
|
||||
'offset': 264,
|
||||
'sequence': [
|
||||
'80 00 00 00 48 00 00 00',
|
||||
'01 00 00 00 00 00 01 00',
|
||||
'00 00 00 00 00 00 00 00',
|
||||
'79 00 00 00 00 00 00 00',
|
||||
'40 00 00 00 00 00 00 00',
|
||||
'00 a0 07 00 00 00 00 00',
|
||||
'0b 93 07 00 00 00 00 00',
|
||||
'0b 93 07 00 00 00 00 00',
|
||||
'31 7a 00 ee 0b 00 00 00'
|
||||
],
|
||||
'is_resident': False,
|
||||
'total_groups': 9,
|
||||
'attribute_length': 72
|
||||
}
|
||||
]
|
||||
|
||||
print(analyze_ntfs_data_attribute(input_data)) # 输出分片数量
|
105
test/parse_80_attribution.py
Normal file
105
test/parse_80_attribution.py
Normal file
@@ -0,0 +1,105 @@
|
||||
def ParseDataRuns(data_bytes: list, cluster_size=512):
|
||||
"""
|
||||
解析 NTFS $80 属性中的数据运行(Data Run),返回每个分片的起始字节数和长度。
|
||||
|
||||
参数:
|
||||
data_bytes (list): 十六进制字符串组成的列表,表示完整的 $80 属性内容。
|
||||
cluster_size (int): 簇大小(默认为 512 字节)
|
||||
|
||||
返回:
|
||||
dict: 包含每个分片信息的字典,格式如下:
|
||||
{
|
||||
"is_resident": False,
|
||||
"data_runs": {
|
||||
"片段1": {"起始字节数": 3202351104, "字节长度": 499712 - 1},
|
||||
"片段2": {...}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
def hex_list_to_int(lst, length, byteorder='little'):
|
||||
"""从列表中提取指定长度的字节并转换为整数"""
|
||||
bytes_data = bytes([int(x, 16) for x in lst[:length]])
|
||||
return int.from_bytes(bytes_data, byteorder=byteorder)
|
||||
|
||||
result = {
|
||||
"is_resident": True,
|
||||
"data_runs": {}
|
||||
}
|
||||
|
||||
# 检查是否是 $80 属性
|
||||
if data_bytes[0] != '80':
|
||||
raise ValueError("不是 $80 属性")
|
||||
|
||||
# 常驻标志在偏移 0x08(第 8 个字节)
|
||||
is_resident = data_bytes[8] == '00'
|
||||
result["is_resident"] = is_resident
|
||||
|
||||
if is_resident:
|
||||
result["data_runs"]["常驻文件"] = {
|
||||
"起始字节数": 0,
|
||||
"字节长度": "该文件为常驻,无分片"
|
||||
}
|
||||
return result
|
||||
|
||||
# 非常驻属性:获取数据运行偏移(偏移 0x20 处的 DWORD)
|
||||
data_run_offset = hex_list_to_int(data_bytes[0x20:0x20 + 4], 4)
|
||||
if data_run_offset >= len(data_bytes):
|
||||
raise ValueError("数据运行偏移超出范围")
|
||||
|
||||
# 提取数据运行部分
|
||||
data_run_bytes = data_bytes[data_run_offset:]
|
||||
pos = 0
|
||||
fragment_index = 1
|
||||
|
||||
while pos < len(data_run_bytes):
|
||||
header_byte = int(data_run_bytes[pos], 16)
|
||||
if header_byte == 0x00:
|
||||
break
|
||||
|
||||
# 高4位:长度字段数量;低4位:偏移字段数量
|
||||
len_len = (header_byte >> 4) & 0x0F
|
||||
offset_len = header_byte & 0x0F
|
||||
|
||||
if len_len == 0 or offset_len == 0:
|
||||
break
|
||||
|
||||
pos += 1
|
||||
|
||||
# 提取偏移量(小端序)
|
||||
offset_bytes = data_run_bytes[pos:pos + offset_len]
|
||||
offset = hex_list_to_int(offset_bytes, offset_len, byteorder='little')
|
||||
|
||||
# 提取长度(小端序)
|
||||
length_bytes = data_run_bytes[pos + offset_len:pos + offset_len + len_len]
|
||||
length = hex_list_to_int(length_bytes, len_len, byteorder='little')
|
||||
|
||||
# 计算起始字节数 = offset * cluster_size
|
||||
start_byte = offset * cluster_size
|
||||
byte_length = length * cluster_size - 1
|
||||
|
||||
result["data_runs"][f"片段{fragment_index}"] = {
|
||||
"起始字节数": start_byte,
|
||||
"字节长度": byte_length
|
||||
}
|
||||
|
||||
pos += offset_len + len_len
|
||||
fragment_index += 1
|
||||
|
||||
return result
|
||||
|
||||
|
||||
input_data = [
|
||||
'80', '00', '00', '00', '48', '00', '00', '00',
|
||||
'01', '00', '00', '00', '00', '00', '01', '00',
|
||||
'00', '00', '00', '00', '00', '00', '00', '00',
|
||||
'79', '00', '00', '00', '00', '00', '00', '00',
|
||||
'40', '00', '00', '00', '00', '00', '00', '00',
|
||||
'00', 'a0', '07', '00', '00', '00', '00', '00',
|
||||
'0b', '93', '07', '00', '00', '00', '00', '00',
|
||||
'0b', '93', '07', '00', '00', '00', '00', '00',
|
||||
'31', '7a', '00', 'ee', '0b', '00', '00', '00'
|
||||
]
|
||||
|
||||
result = ParseDataRuns(input_data)
|
||||
print(result)
|
Reference in New Issue
Block a user