Compare commits
5 Commits
main
...
new_db_sch
Author | SHA1 | Date | |
---|---|---|---|
f9e72de564 | |||
![]() |
542e334987 | ||
![]() |
975f7f3fbc | ||
![]() |
ae777f75d9 | ||
![]() |
7d21842287 |
0
db_manage/__init__.py
Normal file
0
db_manage/__init__.py
Normal file
@@ -23,12 +23,7 @@ def ClearTableRecordsWithReset(db_path, table_name):
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_path')
|
||||
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_path')
|
||||
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_device')
|
||||
# ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_config')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_node')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_device')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_config')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_user')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_group')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_extend_extent')
|
||||
ClearTableRecordsWithReset(db_path='../src/db_ntfs_info.db', table_name='db_extend_name')
|
||||
|
||||
|
@@ -86,9 +86,9 @@ def CreateDBDeviceTable(db_path='../src/db_ntfs_info.db', table_name='db_device'
|
||||
|
||||
def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
|
||||
"""
|
||||
在指定路径下创建 SQLite 数据库,并在其中创建节点信息表。
|
||||
创建 NewDBNode 表,用于存储文件的具体属性和物理分布信息。
|
||||
|
||||
:param db_path: str, 数据库文件的路径
|
||||
:param db_path: str, 数据库文件路径
|
||||
:param table_name: str, 要创建的表名
|
||||
:return: None
|
||||
"""
|
||||
@@ -100,28 +100,18 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
|
||||
|
||||
# 连接到SQLite数据库(如果文件不存在会自动创建)
|
||||
conn = sqlite3.connect(db_path)
|
||||
|
||||
# 创建一个游标对象
|
||||
conn.execute("PRAGMA foreign_keys = ON;") # 开启外键支持
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 动态构建创建表的SQL语句
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
PathID INTEGER,
|
||||
ParentID INTEGER,
|
||||
NameHash TEXT,
|
||||
PathHash TEXT,
|
||||
PathID INTEGER NOT NULL,
|
||||
ExtendNameID INTEGER,
|
||||
DirLayer INTEGER,
|
||||
GroupID INTEGER,
|
||||
UserID INTEGER,
|
||||
FileCreateTime TEXT,
|
||||
FileModifyTime TEXT,
|
||||
FileAccessTime TEXT,
|
||||
FileAuthTime TEXT,
|
||||
FileSize INTEGER,
|
||||
FileMode INTEGER,
|
||||
FileHash TEXT,
|
||||
ExtentCount INTEGER,
|
||||
extent1_DeviceID INTEGER,
|
||||
@@ -137,21 +127,17 @@ def CreateDBNodeTable(db_path='../src/db_ntfs_info.db', table_name='db_node'):
|
||||
extent4_Location INTEGER,
|
||||
extent4_Length INTEGER,
|
||||
|
||||
-- 外键约束(可选)
|
||||
FOREIGN KEY(PathID) REFERENCES db_path(ID),
|
||||
FOREIGN KEY(ExtendNameID) REFERENCES extname_table(ID),
|
||||
FOREIGN KEY(GroupID) REFERENCES groups(ID),
|
||||
FOREIGN KEY(UserID) REFERENCES users(ID)
|
||||
-- 外键约束
|
||||
FOREIGN KEY(PathID) REFERENCES NewDBPath(ID),
|
||||
FOREIGN KEY(ExtendNameID) REFERENCES db_extend(ID),
|
||||
FOREIGN KEY(GroupID) REFERENCES db_group(ID),
|
||||
FOREIGN KEY(UserID) REFERENCES db_user(ID)
|
||||
);
|
||||
"""
|
||||
|
||||
# 执行SQL语句
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 提交更改
|
||||
conn.commit()
|
||||
|
||||
# 关闭连接
|
||||
conn.close()
|
||||
|
||||
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
|
||||
@@ -280,12 +266,11 @@ def CreateDBExtendSnippetTable(db_path='../src/db_ntfs_info.db', table_name='db_
|
||||
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
|
||||
|
||||
|
||||
def CreateDBPathTable(db_path='../src/db_path.db', table_name='db_path'):
|
||||
def CreateDBPathTable(db_path='../src/db_ntfs_info.db', table_name='db_path'):
|
||||
"""
|
||||
在指定路径下创建 SQLite 数据库,并在其中创建路径信息表,
|
||||
包含 DeviceID 字段,用于标记文件所属设备(磁盘)。
|
||||
创建 NewDBPath 表,用于存储文件/目录的路径信息。
|
||||
|
||||
:param db_path: str, 数据库文件的路径
|
||||
:param db_path: str, 数据库文件路径
|
||||
:param table_name: str, 要创建的表名
|
||||
:return: None
|
||||
"""
|
||||
@@ -295,39 +280,42 @@ def CreateDBPathTable(db_path='../src/db_path.db', table_name='db_path'):
|
||||
if directory and not os.path.exists(directory):
|
||||
os.makedirs(directory)
|
||||
|
||||
# 连接到SQLite数据库(如果文件不存在会自动创建)
|
||||
# 连接到SQLite数据库(如果不存在会自动创建)
|
||||
conn = sqlite3.connect(db_path)
|
||||
conn.execute("PRAGMA foreign_keys = ON;") # 开启外键支持
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 动态构建创建表的SQL语句(包含 DeviceID 外键)
|
||||
# 动态构建创建表的SQL语句
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
-- DeviceID TEXT NOT NULL,
|
||||
Path TEXT NOT NULL,
|
||||
Name TEXT NOT NULL,
|
||||
DirLayer INTEGER NOT NULL,
|
||||
PathHash TEXT UNIQUE NOT NULL,
|
||||
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
|
||||
ParentID INTEGER,
|
||||
ContentSize INTEGER,
|
||||
FileCreateTime TEXT,
|
||||
FileModifyTime TEXT,
|
||||
FileAccessTime TEXT,
|
||||
FileAuthTime TEXT,
|
||||
FileMode TEXT,
|
||||
|
||||
-- 外键约束
|
||||
-- FOREIGN KEY(DeviceID) REFERENCES db_device(ID),
|
||||
-- 外键约束(可选)
|
||||
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
|
||||
);
|
||||
"""
|
||||
|
||||
# 执行SQL语句
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 提交更改
|
||||
conn.commit()
|
||||
conn.close()
|
||||
|
||||
print(f"表 [{table_name}] 已在数据库 [{db_path}] 中创建成功")
|
||||
|
||||
|
||||
def CreateDBExtendNameTable(db_path='../src/db_extend_name.db', table_name='db_extend_name'):
|
||||
def CreateDBExtendNameTable(db_path='../src/db_ntfs_info.db', table_name='db_extend_name'):
|
||||
"""
|
||||
在指定路径下创建 SQLite 数据库,并在其中创建扩展名表。
|
||||
|
||||
|
23
fake_main.py
23
fake_main.py
@@ -1,23 +0,0 @@
|
||||
import itertools
|
||||
|
||||
from files_utils.files_save import CopySingleFragmentFiles, CopyMultiFragmentFiles
|
||||
from files_utils.files_sort import GetSortFragments
|
||||
from files_utils.folders_sort import ClassifyFilesAndFolders, ScanMultiFolders
|
||||
|
||||
fragment_lists = {}
|
||||
target_path = r"Z:\test_files"
|
||||
mix_test_data = [
|
||||
"test-copy"
|
||||
]
|
||||
classify_files_and_folders = ClassifyFilesAndFolders(mix_test_data)
|
||||
files_list = classify_files_and_folders["files"]
|
||||
folders_files_list = ScanMultiFolders(classify_files_and_folders["folders"])
|
||||
merged_list = list(itertools.chain(files_list, folders_files_list))
|
||||
|
||||
sort_fragments = GetSortFragments(db_path="./src/db_ntfs_info.db", files_list=merged_list)
|
||||
for item in sort_fragments:
|
||||
extent_count = item['extent_count']
|
||||
if extent_count == 1:
|
||||
CopySingleFragmentFiles(item, target_path=target_path)
|
||||
elif extent_count > 1:
|
||||
CopyMultiFragmentFiles(item, fragment_lists=fragment_lists, target_path=target_path)
|
@@ -1,131 +0,0 @@
|
||||
import os
|
||||
|
||||
|
||||
def GetVolumeLetter() -> str:
|
||||
from ntfs_utils.main import volume_letter
|
||||
return volume_letter
|
||||
|
||||
|
||||
def CopySingleFragmentFiles(source_data_dict, target_path):
|
||||
"""
|
||||
根据起始字节和长度,从磁盘中读取数据并保存为目标文件
|
||||
|
||||
:param source_data_dict: 包含源数据信息的字典
|
||||
:param target_path: 目标文件夹路径
|
||||
"""
|
||||
start_byte = source_data_dict.get("start_byte")
|
||||
byte_length = source_data_dict.get("length")
|
||||
absolute_path = source_data_dict.get("absolute_path")
|
||||
file_name = source_data_dict.get("filename")
|
||||
|
||||
if byte_length <= 0:
|
||||
print("错误:字节长度无效")
|
||||
return
|
||||
|
||||
if not absolute_path or not file_name:
|
||||
print("错误:缺少必要的文件信息")
|
||||
return
|
||||
|
||||
source_disk_path = GetVolumeLetter()
|
||||
target_file_path = os.path.join(target_path, file_name)
|
||||
|
||||
try:
|
||||
# 创建目标目录(如果不存在)
|
||||
os.makedirs(target_path, exist_ok=True)
|
||||
|
||||
with open(fr"\\.\{source_disk_path}:", 'rb') as disk:
|
||||
disk.seek(start_byte)
|
||||
|
||||
with open(target_file_path, 'wb') as f:
|
||||
remaining = byte_length
|
||||
CHUNK_SIZE = 1024 * 1024 # 1MB
|
||||
while remaining > 0:
|
||||
read_size = min(CHUNK_SIZE, remaining)
|
||||
chunk = disk.read(read_size)
|
||||
if not chunk:
|
||||
print("警告:读取到空数据,可能已到达磁盘末尾。")
|
||||
break
|
||||
f.write(chunk)
|
||||
remaining -= len(chunk)
|
||||
|
||||
print(
|
||||
f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}")
|
||||
|
||||
except PermissionError:
|
||||
print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序")
|
||||
except Exception as e:
|
||||
print(f"发生错误: {str(e)}")
|
||||
|
||||
|
||||
def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes:
|
||||
"""
|
||||
从指定磁盘的指定起始位置读取指定长度的字节。
|
||||
|
||||
:param volume_letter: 盘符(如 "Y")
|
||||
:param start_byte: 起始字节位置(整数)
|
||||
:param length: 要读取的字节数(整数)
|
||||
:return: 读取到的原始字节数据(bytes)
|
||||
"""
|
||||
if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1:
|
||||
raise ValueError("drive_letter 必须是单个字母,如 'Y'")
|
||||
|
||||
# 构建 Windows 设备路径格式:\\.\Y:
|
||||
disk_path = f"\\\\.\\{volume_letter.strip().upper()}:"
|
||||
|
||||
try:
|
||||
with open(disk_path, "rb") as disk:
|
||||
disk.seek(start_byte)
|
||||
data = disk.read(length)
|
||||
return data
|
||||
except PermissionError:
|
||||
raise PermissionError("权限不足,请以管理员身份运行程序")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"读取磁盘失败:{e}")
|
||||
|
||||
|
||||
def CopyMultiFragmentFiles(
|
||||
item: dict,
|
||||
fragment_lists: dict,
|
||||
target_path: str
|
||||
):
|
||||
"""
|
||||
处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。
|
||||
|
||||
:param item: 包含文件分片信息的字典
|
||||
:param fragment_lists: 存储各文件分片内容的字典
|
||||
:param target_path: 恢复文件的目标保存路径
|
||||
:return: None
|
||||
"""
|
||||
file_name = item['filename']
|
||||
extent_count = item['extent_count']
|
||||
fragment_index = item['fragment_index']
|
||||
start_byte = item['start_byte']
|
||||
length_byte = item['length']
|
||||
|
||||
volume_letter = GetVolumeLetter()
|
||||
|
||||
# 读取分片内容
|
||||
fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte)
|
||||
|
||||
# 如果还没有为这个文件创建列表,则初始化
|
||||
if file_name not in fragment_lists:
|
||||
fragment_lists[file_name] = [None] * extent_count
|
||||
|
||||
# 将内容插入到指定位置
|
||||
if fragment_index <= extent_count:
|
||||
fragment_lists[file_name][fragment_index - 1] = fragment_content
|
||||
print(f"已写入 {file_name} 的第 {fragment_index} 个片段。")
|
||||
else:
|
||||
print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}")
|
||||
|
||||
# 检查是否所有分片都已加载
|
||||
fragments = fragment_lists[file_name]
|
||||
if None not in fragments:
|
||||
full_content = b''.join(fragments)
|
||||
target_file_path = os.path.join(target_path, file_name)
|
||||
try:
|
||||
with open(target_file_path, 'wb') as f:
|
||||
f.write(full_content)
|
||||
print(f"已成功恢复文件:{file_name}")
|
||||
except Exception as e:
|
||||
print(f"写入文件失败:{file_name},错误:{e}")
|
@@ -1,148 +0,0 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path",
|
||||
files_path=None) -> list:
|
||||
"""
|
||||
根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。
|
||||
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 要查询的数据表名称
|
||||
:param files_path: 文件的完整路径列表
|
||||
:return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str}
|
||||
"""
|
||||
if files_path is None:
|
||||
files_path = []
|
||||
results = []
|
||||
|
||||
# 连接数据库
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
for path in files_path:
|
||||
try:
|
||||
# 使用字符串格式化插入表名,参数化查询只适用于值
|
||||
sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?"
|
||||
cursor.execute(sql, (path,))
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
results.append({
|
||||
'absolute_path': path,
|
||||
'id': row[0],
|
||||
'name': row[1]
|
||||
})
|
||||
else:
|
||||
print(f"未找到匹配记录:{path}")
|
||||
except Exception as e:
|
||||
print(f"查询失败:{path},错误:{e}")
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node",
|
||||
path_records: list = None) -> list:
|
||||
"""
|
||||
根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。
|
||||
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: db_node 表名
|
||||
:param path_records: 来自 get_db_path_info 的结果列表
|
||||
:return: 包含文件分片信息的结果列表
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
results = []
|
||||
|
||||
for record in path_records:
|
||||
path_id = record['id']
|
||||
absolute_path = record['absolute_path']
|
||||
name = record['name']
|
||||
|
||||
try:
|
||||
# 查询 db_node 表中 PathID 对应的记录
|
||||
cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,))
|
||||
row = cursor.fetchone()
|
||||
|
||||
if not row:
|
||||
print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录")
|
||||
continue
|
||||
|
||||
# 获取字段索引(适用于按列名获取)
|
||||
columns = [desc[0] for desc in cursor.description]
|
||||
|
||||
# 构建字典以便按列名访问
|
||||
node_data = dict(zip(columns, row))
|
||||
|
||||
# 获取 ExtentCount
|
||||
extent_count = node_data.get("ExtentCount", 0)
|
||||
|
||||
# 解析分片信息
|
||||
fragments = []
|
||||
for i in range(1, 5): # extent1 ~ extent4
|
||||
loc = node_data.get(f"extent{i}_Location")
|
||||
length = node_data.get(f"extent{i}_Length")
|
||||
|
||||
if loc is not None and length is not None and length > 0:
|
||||
fragments.append({
|
||||
"start_byte": loc,
|
||||
"length": length
|
||||
})
|
||||
|
||||
results.append({
|
||||
"absolute_path": absolute_path,
|
||||
"name": name,
|
||||
"path_id": path_id,
|
||||
"extent_count": extent_count,
|
||||
"fragments": fragments
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
print(f"查询失败:PathID={path_id}, 错误:{e}")
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
def SortFragmentsByStartByte(file_extents_list: list) -> list:
|
||||
"""
|
||||
对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。
|
||||
|
||||
:param file_extents_list: get_file_extents_info 返回的结果列表
|
||||
:return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息
|
||||
"""
|
||||
all_fragments = []
|
||||
|
||||
for file_info in file_extents_list:
|
||||
absolute_path = file_info['absolute_path']
|
||||
filename = file_info['name']
|
||||
extent_count = file_info['extent_count']
|
||||
fragments = file_info['fragments']
|
||||
|
||||
# 对当前文件的片段排序(虽然通常已经是有序的)
|
||||
sorted_fragments = sorted(fragments, key=lambda x: x['start_byte'])
|
||||
|
||||
# 添加片段索引信息
|
||||
for idx, fragment in enumerate(sorted_fragments, start=1):
|
||||
all_fragments.append({
|
||||
'absolute_path': absolute_path,
|
||||
'filename': filename,
|
||||
'extent_count': extent_count,
|
||||
'start_byte': fragment['start_byte'],
|
||||
'length': fragment['length'],
|
||||
'fragment_index': idx
|
||||
})
|
||||
|
||||
# 全局排序:按 start_byte 排序所有片段
|
||||
all_fragments.sort(key=lambda x: x['start_byte'])
|
||||
|
||||
return all_fragments
|
||||
|
||||
|
||||
def GetSortFragments(db_path: str = "../src/db_ntfs_info.db", files_list: list = None) -> list:
|
||||
path_info = GetFilesDBPathInfo(db_path=db_path, table_name="db_path", files_path=files_list)
|
||||
node_info = GetFilesDBNodeInfo(db_path=db_path, table_name="db_node", path_records=path_info)
|
||||
result = SortFragmentsByStartByte(node_info)
|
||||
return result
|
@@ -1,14 +0,0 @@
|
||||
import subprocess
|
||||
|
||||
source_path = r"Y:\\test-copy"
|
||||
target_path = r"Z:\\test-copy"
|
||||
|
||||
subprocess.run([
|
||||
"robocopy",
|
||||
source_path,
|
||||
target_path,
|
||||
"/E", # 包括子目录
|
||||
"/R:3", # 重试次数
|
||||
"/W:1", # 重试等待时间
|
||||
"/MT:16" # 多线程(16线程)
|
||||
])
|
@@ -1,263 +0,0 @@
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
from files_utils.files_sort import GetFilesDBNodeInfo, SortFragmentsByStartByte
|
||||
from ntfs_utils.main import volume_letter
|
||||
|
||||
|
||||
def GetFolderID(
|
||||
folder_path: str,
|
||||
db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path"
|
||||
) -> int | None:
|
||||
"""
|
||||
根据文件夹路径,查询数据库中该文件夹对应的 ID。
|
||||
|
||||
:param folder_path: 文件夹路径(如 r"CloudMusic\\")
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 要查询的数据表名称,默认为 'db_path'
|
||||
:return: 成功则返回 ID(int),失败返回 None
|
||||
"""
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 使用 table_name 构建 SQL 查询
|
||||
sql = f"SELECT ID FROM {table_name} WHERE Path = ?"
|
||||
cursor.execute(sql, (folder_path,))
|
||||
result = cursor.fetchone()
|
||||
|
||||
if result:
|
||||
return result[0]
|
||||
else:
|
||||
print(f"未找到路径:{folder_path} 在表 {table_name} 中")
|
||||
return None
|
||||
|
||||
except sqlite3.Error as e:
|
||||
print(f"数据库操作失败:{e}")
|
||||
return None
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def GetSubPathsByParentID(
|
||||
parent_id: int,
|
||||
db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path"
|
||||
) -> list:
|
||||
"""
|
||||
根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。
|
||||
|
||||
:param parent_id: 父节点 ID
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 数据表名称
|
||||
:return: 包含 ID、Path、Name 的字典列表
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
sql = f"""
|
||||
SELECT ID, Path, Name
|
||||
FROM {table_name}
|
||||
WHERE ParentID = ?
|
||||
"""
|
||||
|
||||
try:
|
||||
cursor.execute(sql, (parent_id,))
|
||||
rows = cursor.fetchall()
|
||||
except Exception as e:
|
||||
print(f"数据库查询失败:{e}")
|
||||
return []
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
item = {
|
||||
'id': row[0],
|
||||
'absolute_path': row[1],
|
||||
'name': row[2]
|
||||
}
|
||||
results.append(item)
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list:
|
||||
"""
|
||||
根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。
|
||||
:param db_path: 要查询的数据库
|
||||
:param folder_path: 文件夹的绝对路径
|
||||
:return list: 文件夹下所有文件按片段顺序排列的列表
|
||||
"""
|
||||
parent_id = GetFolderID(folder_path=folder_path, db_path=db_path)
|
||||
path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path)
|
||||
node_data = GetFilesDBNodeInfo(path_records=path_data)
|
||||
result = SortFragmentsByStartByte(node_data)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# folder_path_test = "pictures/"
|
||||
# data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test)
|
||||
# for item in data:
|
||||
# print(item)
|
||||
|
||||
|
||||
def ScanDirectory(root_dir, skip_system=True):
|
||||
"""
|
||||
递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。
|
||||
|
||||
:param root_dir: 要扫描的根目录路径
|
||||
:param skip_system: 是否跳过系统目录(默认 True)
|
||||
:return: 文件路径列表,格式为 relative/path/to/file.ext
|
||||
"""
|
||||
file_list = []
|
||||
|
||||
for root, dirs, files in os.walk(root_dir):
|
||||
# 跳过系统目录
|
||||
if skip_system:
|
||||
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
|
||||
|
||||
for file in files:
|
||||
full_path = os.path.join(root, file)
|
||||
|
||||
# 去掉盘符
|
||||
_, relative_path = os.path.splitdrive(full_path)
|
||||
|
||||
# 替换 \ 为 /
|
||||
relative_path = relative_path.lstrip("\\").replace("\\", "/")
|
||||
|
||||
file_list.append(relative_path)
|
||||
|
||||
return file_list
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# folder_path = r"Y:/folder1/"
|
||||
# files_list = ScanDirectory(folder_path)
|
||||
#
|
||||
# print(f"共找到 {len(files_list)} 个文件:")
|
||||
# for f in files_list:
|
||||
# print(f)
|
||||
|
||||
|
||||
def ScanMultiFolders(folder_paths_list: list, skip_system: bool = True) -> list:
|
||||
"""
|
||||
扫描多个根目录,返回所有文件的相对路径列表(格式为 folder/file.ext)。
|
||||
|
||||
:param folder_paths_list: 要扫描的根目录列表(如 ["CloudMusic/", "folder1/"])
|
||||
:param skip_system: 是否跳过系统目录
|
||||
:return: 文件路径列表(统一格式为 folder/file.ext)
|
||||
"""
|
||||
all_files = []
|
||||
|
||||
for root_dir in folder_paths_list:
|
||||
# 规范化输入路径,确保结尾有 '/'(如果是目录)
|
||||
normalized_root_dir = root_dir.replace("\\", "/")
|
||||
if not normalized_root_dir.endswith("/"):
|
||||
normalized_root_dir += "/" # 确保结尾 /
|
||||
|
||||
full_root_path = f"{volume_letter}:/{normalized_root_dir}"
|
||||
full_root_path = os.path.normpath(full_root_path)
|
||||
|
||||
if not os.path.exists(full_root_path):
|
||||
print(f"⚠️ 路径不存在:{full_root_path}")
|
||||
continue
|
||||
|
||||
for root, dirs, files in os.walk(full_root_path):
|
||||
if skip_system:
|
||||
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
|
||||
|
||||
for file in files:
|
||||
full_file_path = os.path.join(root, file)
|
||||
|
||||
# 去掉盘符
|
||||
_, relative_path = os.path.splitdrive(full_file_path)
|
||||
|
||||
# 去除开头和结尾的 '\' 或 '/' 并替换分隔符
|
||||
normalized_path = relative_path.strip("\\").strip("/").replace("\\", "/")
|
||||
|
||||
all_files.append(normalized_path)
|
||||
|
||||
return all_files
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# folders = [
|
||||
# "CloudMusic\\",
|
||||
# "folder1/"
|
||||
# ]
|
||||
#
|
||||
# files = ScanMultiFolders(folders)
|
||||
#
|
||||
# print(f"共找到 {len(files)} 个文件:")
|
||||
# for f in files:
|
||||
# print(f)
|
||||
|
||||
|
||||
def ClassifyFilesAndFolders(paths: list) -> dict:
|
||||
"""
|
||||
将传入的路径列表分类为文件和目录,并统一使用 '/' 分隔符。
|
||||
确保目录路径以 '/' 结尾。
|
||||
|
||||
:param paths: 路径列表(元素可以是文件或目录)
|
||||
:return: 包含 'files' 和 'directories' 的字典,路径格式统一为 '/'
|
||||
"""
|
||||
files = []
|
||||
directories = []
|
||||
|
||||
for path in paths:
|
||||
# 统一用 '/' 分隔符,并保留原始结构(是否以 '/' 结尾)
|
||||
normalized_path = path.replace("\\", "/")
|
||||
|
||||
# 判断是否原本是目录(以 '/' 或 '\' 结尾)
|
||||
is_potential_dir = normalized_path.endswith("/")
|
||||
|
||||
# 拼接完整路径用于判断是否存在
|
||||
full_path = f"{volume_letter}:/{normalized_path.lstrip('/')}"
|
||||
full_path = os.path.normpath(full_path)
|
||||
|
||||
if os.path.isfile(full_path):
|
||||
# 如果是文件,去掉结尾的 /(如果有的话)
|
||||
if normalized_path.endswith("/"):
|
||||
normalized_path = normalized_path.rstrip("/")
|
||||
files.append(normalized_path)
|
||||
elif os.path.isdir(full_path):
|
||||
# 如果是目录,确保以 '/' 结尾
|
||||
if not normalized_path.endswith("/"):
|
||||
normalized_path += "/"
|
||||
directories.append(normalized_path)
|
||||
else:
|
||||
print(f"⚠️ 路径不存在或类型未知:{normalized_path}")
|
||||
|
||||
return {
|
||||
'files': files,
|
||||
'folders': directories
|
||||
}
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# test_paths = [
|
||||
# "CloudMusic\\AGA - MIZU.mp3",
|
||||
# "CloudMusic/AGA - 一.mp3",
|
||||
# "CloudMusic/Aaron Zigman - Main Title.mp3",
|
||||
# "CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
|
||||
# "CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
|
||||
# "CloudMusic/Ava Max - Sweet but Psycho.mp3",
|
||||
# "CloudMusic\\",
|
||||
# "folder1/",
|
||||
# "CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
|
||||
# "CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
|
||||
# ]
|
||||
#
|
||||
# result = ClassifyFilesAndFolders(test_paths)
|
||||
#
|
||||
# print("✅ 文件列表:")
|
||||
# for f in result['files']:
|
||||
# print(f)
|
||||
#
|
||||
# print("\n📁 文件夹列表:")
|
||||
# for d in result['directories']:
|
||||
# print(d)
|
@@ -1,15 +1,14 @@
|
||||
from ntfs_utils.db_config import GetNTFSBootInfo, InsertInfoToDBConfig
|
||||
from ntfs_utils.db_device import ScanSpecialVolumes, InsertVolumesToDB
|
||||
from ntfs_utils.db_extend_name import InsertExtensionsToDB
|
||||
from ntfs_utils.db_group import InsertGroupToDB
|
||||
# from ntfs_utils.db_node import InsertNodeDataToDB
|
||||
from ntfs_utils.db_path import DBPathMain
|
||||
from ntfs_utils.db_user import InsertUserToDB
|
||||
|
||||
volume_letter = 'Y'
|
||||
from db_config import GetNTFSBootInfo, InsertInfoToDBConfig
|
||||
from db_device import ScanSpecialVolumes, InsertVolumesToDB
|
||||
from db_extend_name import InsertExtensionsToDB
|
||||
from db_group import InsertGroupToDB
|
||||
from db_path import GenerateHash, ShouldSkipPath, ScanVolume, InsertPathDataToDB
|
||||
from db_user import InsertUserToDB
|
||||
|
||||
|
||||
def main():
|
||||
volume_letter = 'Z'
|
||||
|
||||
# 初始化 db_config 表
|
||||
config_data = GetNTFSBootInfo(volume_letter)
|
||||
InsertInfoToDBConfig(config_data)
|
||||
@@ -26,6 +25,10 @@ def main():
|
||||
group_name_list = ["Copier"]
|
||||
InsertGroupToDB(group_name_list)
|
||||
|
||||
# 初始化 db_path 表
|
||||
scanned_data = ScanVolume(volume_letter)
|
||||
InsertPathDataToDB(scanned_data)
|
||||
|
||||
# 初始化 db_extend_name 表
|
||||
common_extensions = [
|
||||
"txt", "log", "csv", "xls", "xlsx", "doc", "docx",
|
||||
@@ -37,12 +40,6 @@ def main():
|
||||
count = InsertExtensionsToDB(common_extensions)
|
||||
print(f"共插入 {count} 个新扩展名。")
|
||||
|
||||
# 初始化 db_path 表
|
||||
DBPathMain(volume_letter=volume_letter)
|
||||
|
||||
# 初始化 db_node 表
|
||||
# InsertNodeDataToDB(volume_letter)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@@ -1,19 +1,18 @@
|
||||
import hashlib
|
||||
import os
|
||||
import random
|
||||
import sqlite3
|
||||
from datetime import datetime
|
||||
|
||||
from ntfs_utils.mft_analyze import GetFile80hPattern, GetFragmentData, ExtractSequenceHexValues, hex_list_to_int
|
||||
from ntfs_utils.main import volume_letter
|
||||
from mft_analyze import GetFile80hPattern
|
||||
|
||||
|
||||
# 工具函数:获取文件扩展名
|
||||
# ✅ 工具函数:获取文件扩展名
|
||||
def GetFileExtension(name: str) -> str:
|
||||
parts = name.rsplit('.', 1)
|
||||
return parts[1].lower() if len(parts) > 1 else ""
|
||||
if len(parts) > 1:
|
||||
return parts[1].lower()
|
||||
return ""
|
||||
|
||||
|
||||
# 获取 ExtendNameID(基于文件名后缀)
|
||||
# ✅ 函数:获取 ExtendNameID(基于文件名后缀)
|
||||
def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int:
|
||||
ext = GetFileExtension(name)
|
||||
if not ext:
|
||||
@@ -24,243 +23,165 @@ def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int:
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
# 获取 DirLayer(路径层级)
|
||||
def GetDirLayer(path: str) -> int:
|
||||
path = path.strip()
|
||||
if not path or path == "\\":
|
||||
return 0
|
||||
return path.count("\\") - 1
|
||||
|
||||
|
||||
# 获取 GroupID(默认第一个)
|
||||
# ✅ 函数:获取 GroupID(默认第一个)
|
||||
def GetFirstGroupId(cursor: sqlite3.Cursor) -> int:
|
||||
cursor.execute("SELECT ID FROM db_group ORDER BY ID LIMIT 1")
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
# 获取 UserID(默认第一个)
|
||||
# ✅ 函数:获取 UserID(默认第一个)
|
||||
def GetFirstUserId(cursor: sqlite3.Cursor) -> int:
|
||||
cursor.execute("SELECT ID FROM db_user ORDER BY ID LIMIT 1")
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
def GetFilesTime(file_path):
|
||||
"""
|
||||
获取指定文件的创建时间、修改时间、访问时间和权限变更时间。
|
||||
st_atime: 最后一次访问时间(FileAccessTime)
|
||||
st_mtime: 最后一次修改内容的时间(FileModifyTime)
|
||||
st_ctime: 文件元数据(metadata)更改时间,在 Windows 中是文件创建时间(FileCreateTime)
|
||||
参数:
|
||||
file_path (str): 文件的绝对路径
|
||||
|
||||
返回:
|
||||
dict: 包含 FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime 的字符串格式,
|
||||
如果无法获取则返回 "default"。
|
||||
"""
|
||||
if not os.path.exists(file_path):
|
||||
return {
|
||||
"FileCreateTime": "default",
|
||||
"FileModifyTime": "default",
|
||||
"FileAccessTime": "default",
|
||||
"FileAuthTime": "default"
|
||||
}
|
||||
|
||||
try:
|
||||
stat_info = os.stat(file_path)
|
||||
|
||||
def ts_to_str(timestamp):
|
||||
return datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
create_time = ts_to_str(stat_info.st_ctime)
|
||||
modify_time = ts_to_str(stat_info.st_mtime)
|
||||
access_time = ts_to_str(stat_info.st_atime)
|
||||
|
||||
# 权限变更时间,Windows 下可能不适用
|
||||
try:
|
||||
auth_time = ts_to_str(getattr(stat_info, 'st_birthtime', stat_info.st_ctime))
|
||||
except Exception:
|
||||
auth_time = "default"
|
||||
|
||||
return {
|
||||
"FileCreateTime": create_time,
|
||||
"FileModifyTime": modify_time,
|
||||
"FileAccessTime": access_time,
|
||||
"FileAuthTime": auth_time
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 获取文件时间失败: {e}")
|
||||
return {
|
||||
"FileCreateTime": "default",
|
||||
"FileModifyTime": "default",
|
||||
"FileAccessTime": "default",
|
||||
"FileAuthTime": "default"
|
||||
}
|
||||
# ✅ 【伪代码】获取文件大小(字节)
|
||||
def GetFileSize(full_path: str) -> int:
|
||||
return 10
|
||||
|
||||
|
||||
# 获取设备ID(db_device第一条记录)
|
||||
# ✅ 【伪代码】获取文件内容哈希
|
||||
def GetFileHash(full_path: str) -> str:
|
||||
return hashlib.sha256(b"mocked_file_content").hexdigest()
|
||||
|
||||
|
||||
# ✅ 【伪代码】获取分片数
|
||||
def GetExtentCount(full_path: str) -> int:
|
||||
return 1
|
||||
|
||||
|
||||
# ✅ 【伪代码】获取设备ID(db_device第一条记录)
|
||||
def GetDeviceId(cursor: sqlite3.Cursor) -> int:
|
||||
cursor.execute("SELECT ID FROM db_device ORDER BY ID LIMIT 1")
|
||||
result = cursor.fetchone()
|
||||
return result[0] if result else 0
|
||||
|
||||
|
||||
# 获取文件大小(伪数据)
|
||||
def GetFileSize(file80h_pattern):
|
||||
if not file80h_pattern or not isinstance(file80h_pattern, list):
|
||||
return 0
|
||||
|
||||
if file80h_pattern[0].get('is_resident'):
|
||||
fragments = GetFragmentData(file80h_pattern)
|
||||
if fragments and len(fragments) > 0:
|
||||
return fragments[0].get('byte_length', 0)
|
||||
else:
|
||||
sequence_list = ExtractSequenceHexValues(file80h_pattern)
|
||||
if len(sequence_list) < 64:
|
||||
raise ValueError("序列长度不足,无法解析文件大小")
|
||||
|
||||
size_list = sequence_list[56:64]
|
||||
size = hex_list_to_int(size_list)
|
||||
return size
|
||||
# ✅ 【伪代码】获取随机 Location
|
||||
def GetRandomLocation() -> int:
|
||||
return random.randint(1000, 9999)
|
||||
|
||||
|
||||
# 获取文件内容哈希(伪数据)
|
||||
def GetFileHash(full_path: str) -> str:
|
||||
return hashlib.sha256(full_path.encode()).hexdigest()
|
||||
# ✅ 【伪代码】获取随机 Length
|
||||
def GetRandomLength() -> int:
|
||||
return random.randint(1000, 9999)
|
||||
|
||||
|
||||
# 新增:获取文件片段位置和长度
|
||||
def GetFragmentLocation(fragment):
|
||||
return fragment.get('starting_byte', 0)
|
||||
# ✅ 主函数:遍历 NewDBPath 插入 NewDBNode(或自定义表名)
|
||||
def InsertNodeDataToDb(db_path='../src/filesystem.db', table_name='db_node'):
|
||||
"""
|
||||
遍历 NewDBPath 表,并生成对应的 Node 数据插入到指定表中。
|
||||
|
||||
|
||||
def GetFragmentLength(fragment):
|
||||
return fragment.get('byte_length', 0)
|
||||
|
||||
|
||||
# 主函数:将 db_path 数据导入 db_node
|
||||
def InsertNodeDataToDB(db_path='../src/db_ntfs_info.db', table_name='db_node', batch_size=20):
|
||||
参数:
|
||||
db_path: str,数据库路径
|
||||
table_name: str,目标表名
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
if len(volume_letter) == 1:
|
||||
volume_root = f"{volume_letter}:\\"
|
||||
elif volume_letter.endswith(':'):
|
||||
volume_root = f"{volume_letter}\\"
|
||||
else:
|
||||
volume_root = f"{volume_letter}:\\" # 支持 "Y" 或 "Y:" 输入
|
||||
try:
|
||||
# 动态创建表(如果不存在)
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
PathID INTEGER NOT NULL,
|
||||
ExtendNameID INTEGER,
|
||||
GroupID INTEGER,
|
||||
UserID INTEGER,
|
||||
FileSize INTEGER,
|
||||
FileHash TEXT,
|
||||
ExtentCount INTEGER,
|
||||
extent1_DeviceID INTEGER,
|
||||
extent1_Location INTEGER,
|
||||
extent1_Length INTEGER,
|
||||
extent2_DeviceID INTEGER,
|
||||
extent2_Location INTEGER,
|
||||
extent2_Length INTEGER,
|
||||
extent3_DeviceID INTEGER,
|
||||
extent3_Location INTEGER,
|
||||
extent3_Length INTEGER,
|
||||
extent4_DeviceID INTEGER,
|
||||
extent4_Location INTEGER,
|
||||
extent4_Length INTEGER,
|
||||
|
||||
print(f"🔍 当前处理磁盘根目录:{volume_root}")
|
||||
-- 外键约束
|
||||
FOREIGN KEY(PathID) REFERENCES db_path(ID),
|
||||
FOREIGN KEY(ExtendNameID) REFERENCES db_extend(ID),
|
||||
FOREIGN KEY(GroupID) REFERENCES db_group(ID),
|
||||
FOREIGN KEY(UserID) REFERENCES db_user(ID)
|
||||
);
|
||||
"""
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
group_id = GetFirstGroupId(cursor)
|
||||
user_id = GetFirstUserId(cursor)
|
||||
device_id = GetDeviceId(cursor)
|
||||
# 获取所有 NewDBPath 记录
|
||||
cursor.execute("SELECT ID, Name, Path, IsDir FROM db_path")
|
||||
path_records = cursor.fetchall()
|
||||
|
||||
cursor.execute("SELECT ID, Path, Name, ParentID FROM db_path")
|
||||
rows = cursor.fetchall()
|
||||
batch = []
|
||||
device_id = GetDeviceId(cursor)
|
||||
|
||||
insert_fields = [
|
||||
'PathID', 'ParentID', 'NameHash', 'PathHash',
|
||||
'ExtendNameID', 'DirLayer', 'GroupID', 'UserID',
|
||||
'FileCreateTime', 'FileModifyTime', 'FileAccessTime', 'FileAuthTime',
|
||||
'FileSize', 'FileMode', 'FileHash', 'ExtentCount',
|
||||
# extent 字段
|
||||
"extent1_DeviceID", "extent1_Location", "extent1_Length",
|
||||
"extent2_DeviceID", "extent2_Location", "extent2_Length",
|
||||
"extent3_DeviceID", "extent3_Location", "extent3_Length",
|
||||
"extent4_DeviceID", "extent4_Location", "extent4_Length"
|
||||
]
|
||||
insert_placeholders = ', '.join('?' * len(insert_fields))
|
||||
insert_sql = f"INSERT INTO {table_name} ({', '.join(insert_fields)}) VALUES ({insert_placeholders})"
|
||||
|
||||
batch = []
|
||||
|
||||
for row in rows:
|
||||
path_id, relative_path, name, parent_id = row
|
||||
|
||||
full_path = os.path.join(volume_root, relative_path)
|
||||
|
||||
# 检查是否已存在相同 PathID
|
||||
cursor.execute("SELECT COUNT(*) FROM db_node WHERE PathID = ?", (path_id,))
|
||||
exists = cursor.fetchone()[0]
|
||||
if exists > 0:
|
||||
print(f"⚠️ PathID {path_id} 已存在,跳过插入")
|
||||
continue
|
||||
|
||||
try:
|
||||
file80h_pattern = GetFile80hPattern(full_path)
|
||||
fragments = GetFragmentData(file80h_pattern)
|
||||
extent_count = min(len(fragments), 4)
|
||||
except Exception as e:
|
||||
print(f"⚠️ 获取 ExtentCount 失败,使用默认值 0: {e}")
|
||||
fragments = []
|
||||
extent_count = 0
|
||||
|
||||
# 计算字段
|
||||
name_hash = hashlib.sha256(name.encode()).hexdigest()
|
||||
dir_layer = GetDirLayer(relative_path)
|
||||
extend_name_id = GetExtendNameId(name, cursor)
|
||||
|
||||
try:
|
||||
file_size = GetFileSize(file80h_pattern)
|
||||
except Exception as e:
|
||||
print(f"⚠️ 获取文件大小失败,使用默认值 0: {e}")
|
||||
file_size = 0
|
||||
|
||||
file_hash = GetFileHash(full_path)
|
||||
|
||||
# 获取时间信息
|
||||
file_times = GetFilesTime(full_path)
|
||||
create_time = file_times["FileCreateTime"]
|
||||
modify_time = file_times["FileModifyTime"]
|
||||
access_time = file_times["FileAccessTime"]
|
||||
auth_time = file_times["FileAuthTime"]
|
||||
|
||||
# 查询 PathHash
|
||||
cursor.execute("SELECT PathHash FROM db_path WHERE ID = ?", (path_id,))
|
||||
path_hash_result = cursor.fetchone()
|
||||
path_hash = path_hash_result[0] if path_hash_result else ""
|
||||
|
||||
# 构建 extent 字段
|
||||
extent_data = []
|
||||
for i in range(4): # 最多4个 extent
|
||||
if i < len(fragments):
|
||||
frag = fragments[i]
|
||||
location = GetFragmentLocation(frag)
|
||||
length = GetFragmentLength(frag)
|
||||
extent_data.extend([device_id, location, length])
|
||||
for path_id, name, full_path, is_dir in path_records:
|
||||
if is_dir == 1:
|
||||
extend_name_id = 0
|
||||
else:
|
||||
extent_data.extend([None, None, None])
|
||||
extend_name_id = GetExtendNameId(name, cursor)
|
||||
|
||||
# 构建插入数据
|
||||
values = [
|
||||
path_id, parent_id, name_hash, path_hash,
|
||||
extend_name_id, dir_layer, group_id, user_id,
|
||||
create_time, modify_time, access_time, auth_time,
|
||||
file_size, 'default', file_hash, extent_count,
|
||||
*extent_data
|
||||
]
|
||||
group_id = GetFirstGroupId(cursor)
|
||||
user_id = GetFirstUserId(cursor)
|
||||
|
||||
batch.append(values)
|
||||
file_size = GetFileSize(full_path)
|
||||
file_hash = GetFileHash(full_path)
|
||||
extent_count = GetExtentCount(full_path)
|
||||
|
||||
# 构造 extent 数据(最多 4 个片段)
|
||||
extent_data = []
|
||||
for i in range(extent_count):
|
||||
extent_data.append((device_id, GetRandomLocation(), GetRandomLength()))
|
||||
|
||||
# 填充到 4 个字段
|
||||
while len(extent_data) < 4:
|
||||
extent_data.append((0, 0, 0))
|
||||
|
||||
# 添加到批次插入数据
|
||||
batch.append((
|
||||
path_id,
|
||||
extend_name_id,
|
||||
group_id,
|
||||
user_id,
|
||||
file_size,
|
||||
file_hash,
|
||||
extent_count,
|
||||
*extent_data[0],
|
||||
*extent_data[1],
|
||||
*extent_data[2],
|
||||
*extent_data[3]
|
||||
))
|
||||
|
||||
# 批量插入
|
||||
if len(batch) >= batch_size:
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
print(f"✅ 提交一批 {len(batch)} 条记录到 {table_name}")
|
||||
batch.clear()
|
||||
insert_sql = f"""
|
||||
INSERT OR IGNORE INTO {table_name} (
|
||||
PathID, ExtendNameID, GroupID, UserID, FileSize, FileHash, ExtentCount,
|
||||
extent1_DeviceID, extent1_Location, extent1_Length,
|
||||
extent2_DeviceID, extent2_Location, extent2_Length,
|
||||
extent3_DeviceID, extent3_Location, extent3_Length,
|
||||
extent4_DeviceID, extent4_Location, extent4_Length
|
||||
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
"""
|
||||
|
||||
# 插入剩余不足一批的数据
|
||||
if batch:
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
print(f"✅ 提交最后一批 {len(batch)} 条记录到 {table_name}")
|
||||
print(f"✅ 成功插入 {cursor.rowcount} 条 {table_name} 记录")
|
||||
|
||||
conn.close()
|
||||
print(f"✅ 数据已成功插入到 {table_name} 表")
|
||||
except Exception as e:
|
||||
print(f"❌ 插入失败: {e}")
|
||||
conn.rollback()
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# 示例调用
|
||||
if __name__ == "__main__":
|
||||
InsertNodeDataToDB()
|
||||
InsertNodeDataToDb(db_path='../src/db_ntfs_info.db', table_name='db_node')
|
||||
|
@@ -1,6 +1,49 @@
|
||||
import hashlib
|
||||
import os
|
||||
import sqlite3
|
||||
import time
|
||||
|
||||
|
||||
def get_file_times(full_path):
|
||||
"""
|
||||
获取文件的创建、修改、访问时间,并格式化为字符串。
|
||||
|
||||
参数:
|
||||
full_path: str,文件路径
|
||||
|
||||
返回:
|
||||
tuple: (create_time, modify_time, access_time, auth_time)
|
||||
"""
|
||||
try:
|
||||
stat = os.stat(full_path)
|
||||
|
||||
# 转换为可读时间格式:YYYY-MM-DD HH:MM:SS
|
||||
def format_time(timestamp):
|
||||
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(timestamp))
|
||||
|
||||
create_time = format_time(stat.st_ctime)
|
||||
modify_time = format_time(stat.st_mtime)
|
||||
access_time = format_time(stat.st_atime)
|
||||
auth_time = format_time(stat.st_ctime) # Windows 上用 ctime 表示权限变化时间(近似)
|
||||
|
||||
return create_time, modify_time, access_time, auth_time
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ 获取时间失败: {e}")
|
||||
return "unknown", "unknown", "unknown", "unknown"
|
||||
|
||||
|
||||
def get_file_mode(full_path):
|
||||
"""
|
||||
获取文件权限模式(Windows 下模拟)。
|
||||
可以根据只读、隐藏等属性扩展
|
||||
"""
|
||||
try:
|
||||
stat = os.stat(full_path)
|
||||
# 在 Windows 下模拟权限为 'default',也可用 bit mask 解析
|
||||
return "default"
|
||||
except Exception as e:
|
||||
return "unknown"
|
||||
|
||||
|
||||
def GenerateHash(s: str) -> str:
|
||||
@@ -23,26 +66,54 @@ def ShouldSkipPath(path: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def ScanVolume(volume_letter: str) -> list:
|
||||
def GetDirLayer(full_path: str, volume_letter: str) -> int:
|
||||
"""
|
||||
根据路径计算目录层级。
|
||||
|
||||
示例:
|
||||
Z:\demo.txt → 0
|
||||
Z:\folder\test.txt → 1
|
||||
Z:\folder\subfolder\file.txt → 2
|
||||
|
||||
参数:
|
||||
full_path: str,完整路径
|
||||
volume_letter: str,磁盘盘符(如 'Z')
|
||||
|
||||
返回:
|
||||
int,层级数
|
||||
"""
|
||||
root_prefix = f"{volume_letter.upper()}:\\"
|
||||
if not full_path.startswith(root_prefix):
|
||||
return -1 # 非法路径
|
||||
|
||||
relative_path = full_path[len(root_prefix):]
|
||||
if not relative_path:
|
||||
return 0 # 根目录层级为 0
|
||||
|
||||
return len(relative_path.split(os.sep)) - 1
|
||||
|
||||
|
||||
def ScanVolume(volume_letter: str):
|
||||
"""
|
||||
完整扫描指定磁盘的所有文件和目录,忽略 NTFS 元文件和系统文件夹,
|
||||
并为每个节点分配 ParentID。
|
||||
"""
|
||||
并为每个节点分配 ParentID 和 DirLayer。
|
||||
|
||||
返回:
|
||||
list of dict:包含文件/目录信息的字典列表
|
||||
"""
|
||||
root_path = f"{volume_letter.upper()}:\\"
|
||||
if not os.path.exists(root_path):
|
||||
raise ValueError(f"磁盘 {root_path} 不存在")
|
||||
|
||||
path_to_id = {} # 路径 -> ID 映射
|
||||
counter = 1
|
||||
result = []
|
||||
path_to_id = {} # 用于记录路径到数据库 ID 的映射
|
||||
counter = 1 # 模拟数据库自增 ID
|
||||
|
||||
for root, dirs, files in os.walk(root_path, topdown=True, onerror=None, followlinks=False):
|
||||
# 过滤掉需要跳过的目录
|
||||
dirs[:] = [d for d in dirs if not ShouldSkipPath(os.path.join(root, d))]
|
||||
|
||||
entries = files + dirs
|
||||
|
||||
for entry in entries:
|
||||
for entry in files + dirs:
|
||||
full_path = os.path.join(root, entry)
|
||||
|
||||
if ShouldSkipPath(full_path):
|
||||
@@ -59,61 +130,62 @@ def ScanVolume(volume_letter: str) -> list:
|
||||
continue
|
||||
|
||||
name = entry
|
||||
path_hash = GenerateHash(full_path)
|
||||
|
||||
# 分离盘符并处理路径格式
|
||||
_, relative_path = os.path.splitdrive(full_path)
|
||||
relative_path = relative_path.lstrip("\\").rstrip("\\")
|
||||
if os.path.isdir(full_path) and not relative_path.endswith("/"):
|
||||
relative_path += "/"
|
||||
|
||||
relative_path = relative_path.replace("\\", "/")
|
||||
|
||||
path_hash = GenerateHash(relative_path)
|
||||
|
||||
# 计算 ContentSize(KB),小文件至少显示为 1 KB
|
||||
content_size = bytes_size // 1024
|
||||
if content_size == 0 and bytes_size > 0:
|
||||
content_size = 1
|
||||
|
||||
parent_path = os.path.dirname(full_path)
|
||||
_, parent_relative_path = os.path.splitdrive(parent_path)
|
||||
parent_relative_path = parent_relative_path.lstrip("\\").rstrip("\\")
|
||||
if os.path.isdir(parent_path) and not parent_relative_path.endswith("/"):
|
||||
parent_relative_path += "/"
|
||||
parent_relative_path = parent_relative_path.replace("\\", "/")
|
||||
parent_id = path_to_id.get(parent_path, 0)
|
||||
dir_layer = GetDirLayer(full_path, volume_letter)
|
||||
|
||||
parent_id = path_to_id.get(parent_relative_path, 0)
|
||||
# ✅ 获取文件时间属性
|
||||
ctime, mtime, atime, chgtime = get_file_times(full_path)
|
||||
mode = get_file_mode(full_path)
|
||||
|
||||
item = {
|
||||
"ID": counter,
|
||||
"Path": relative_path,
|
||||
"Path": full_path,
|
||||
"Name": name,
|
||||
"PathHash": path_hash,
|
||||
"IsDir": is_dir,
|
||||
"ParentID": parent_id,
|
||||
"ContentSize": content_size
|
||||
"ContentSize": content_size,
|
||||
"DirLayer": dir_layer,
|
||||
"FileCreateTime": ctime,
|
||||
"FileModifyTime": mtime,
|
||||
"FileAccessTime": atime,
|
||||
"FileAuthTime": chgtime,
|
||||
"FileMode": mode
|
||||
}
|
||||
|
||||
yield item # 使用 yield 返回每条记录
|
||||
path_to_id[relative_path] = counter
|
||||
result.append(item)
|
||||
path_to_id[full_path] = counter
|
||||
counter += 1
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ 跳过路径 {full_path},错误: {e}")
|
||||
|
||||
return result
|
||||
|
||||
def InsertPathDataToDB(data_generator, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
|
||||
|
||||
def InsertPathDataToDB(data, db_path='../src/db_ntfs_info.db', table_name='db_path', batch_size=20):
|
||||
"""
|
||||
流式写入数据库,边扫描边入库。
|
||||
批量将扫描结果写入 NewDBPath 表中,支持新字段。
|
||||
|
||||
:param data_generator: 可迭代对象(如生成器)
|
||||
:param db_path: 数据库路径
|
||||
:param table_name: 表名
|
||||
:param batch_size: 每多少条记录提交一次
|
||||
参数:
|
||||
data: list of dict,扫描结果数据
|
||||
db_path: str,SQLite 数据库路径
|
||||
table_name: str,目标表名
|
||||
batch_size: int,每多少条提交一次
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 创建表(如果不存在)
|
||||
create_table_sql = f"""
|
||||
CREATE TABLE IF NOT EXISTS {table_name} (
|
||||
ID INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
@@ -123,60 +195,80 @@ def InsertPathDataToDB(data_generator, db_path='../src/db_ntfs_info.db', table_n
|
||||
IsDir INTEGER NOT NULL CHECK(IsDir IN (0, 1)),
|
||||
ParentID INTEGER,
|
||||
ContentSize INTEGER,
|
||||
DirLayer INTEGER NOT NULL,
|
||||
FileCreateTime TEXT,
|
||||
FileModifyTime TEXT,
|
||||
FileAccessTime TEXT,
|
||||
FileAuthTime TEXT,
|
||||
FileMode TEXT,
|
||||
|
||||
FOREIGN KEY(ParentID) REFERENCES {table_name}(ID)
|
||||
);
|
||||
"""
|
||||
cursor.execute(create_table_sql)
|
||||
|
||||
# 插入语句(忽略重复 PathHash)
|
||||
insert_sql = f"""
|
||||
INSERT OR IGNORE INTO {table_name}
|
||||
(Path, Name, PathHash, IsDir, ParentID, ContentSize)
|
||||
VALUES (?, ?, ?, ?, ?, ?)
|
||||
(Path, Name, PathHash, IsDir, ParentID, ContentSize, DirLayer, FileCreateTime, FileModifyTime, FileAccessTime, FileAuthTime, FileMode)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
"""
|
||||
|
||||
total_inserted = 0
|
||||
batch = []
|
||||
|
||||
for item in data_generator:
|
||||
for item in data:
|
||||
batch.append((
|
||||
item['Path'],
|
||||
item['Name'],
|
||||
item['PathHash'],
|
||||
item['IsDir'],
|
||||
item['ParentID'] or 0,
|
||||
item['ContentSize']
|
||||
item['ContentSize'],
|
||||
item['DirLayer'],
|
||||
item['FileCreateTime'],
|
||||
item['FileModifyTime'],
|
||||
item['FileAccessTime'],
|
||||
item['FileAuthTime'],
|
||||
item['FileMode']
|
||||
))
|
||||
|
||||
if len(batch) >= batch_size:
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
total_inserted += cursor.rowcount
|
||||
print(f"✅ 提交一批 {len(batch)} 条数据")
|
||||
batch.clear()
|
||||
|
||||
# 提交剩余不足一批的数据
|
||||
# 插入剩余数据
|
||||
if batch:
|
||||
cursor.executemany(insert_sql, batch)
|
||||
conn.commit()
|
||||
total_inserted += cursor.rowcount
|
||||
print(f"✅ 提交最后一批 {len(batch)} 条数据")
|
||||
|
||||
print(f"✅ 总共插入 {total_inserted} 条记录到 NewDBPath 表。")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 插入失败: {e}")
|
||||
conn.rollback()
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
# 示例主函数
|
||||
def DBPathMain(volume_letter: str):
|
||||
def main():
|
||||
volume_letter = "Z"
|
||||
|
||||
print(f"🔍 开始全盘扫描磁盘 {volume_letter}:\\ ...")
|
||||
scanned_data = ScanVolume(volume_letter)
|
||||
|
||||
# 获取生成器对象
|
||||
generator = ScanVolume(volume_letter)
|
||||
print(f"📊 共扫描到 {len(scanned_data)} 条有效记录,开始入库...")
|
||||
InsertPathDataToDB(scanned_data)
|
||||
|
||||
print(f"📊 开始逐批入库...")
|
||||
InsertPathDataToDB(generator)
|
||||
|
||||
print("✅ 全盘扫描与入库完成")
|
||||
print("✅ 全盘扫描与 NewDBPath 表入库完成")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
DBPathMain(volume_letter="Y")
|
||||
main()
|
||||
|
@@ -1,8 +1,9 @@
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
import pytsk3
|
||||
|
||||
from ntfs_utils.db_config import GetNTFSBootInfo
|
||||
from db_config import GetNTFSBootInfo
|
||||
|
||||
|
||||
def find_file_mft_entry(fs, target_path):
|
||||
@@ -60,7 +61,7 @@ def GetFileMftEntry(file_path):
|
||||
drive_letter = os.path.splitdrive(file_path)[0][0]
|
||||
device = f"\\\\.\\{drive_letter}:"
|
||||
|
||||
# print(f"Opening device: {device}")
|
||||
print(f"Opening device: {device}")
|
||||
|
||||
try:
|
||||
img = pytsk3.Img_Info(device)
|
||||
@@ -73,10 +74,9 @@ def GetFileMftEntry(file_path):
|
||||
root_path = f"{drive_letter}:\\"
|
||||
rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\")
|
||||
|
||||
# print(f"Looking up MFT entry for: {rel_path}")
|
||||
print(f"Looking up MFT entry for: {rel_path}")
|
||||
|
||||
mft_entry = find_file_mft_entry(fs, rel_path)
|
||||
# print(f"MFT Entry: {mft_entry}")
|
||||
if mft_entry is None:
|
||||
raise RuntimeError("Could not find MFT entry for the specified file.")
|
||||
|
||||
@@ -103,9 +103,7 @@ def CalculateFileMftStartSector(mft_entry, volume_letter="Z"):
|
||||
config_data = GetNTFSBootInfo(volume_letter)
|
||||
# 计算文件 MFT Entry 的起始扇区号
|
||||
start_sector = config_data["MftPosition"] * 8 + mft_entry * 2
|
||||
if start_sector < 0:
|
||||
raise ValueError("起始扇区号不能为负数")
|
||||
# print(f"文件 MFT Entry 的起始扇区号: {start_sector}")
|
||||
|
||||
return start_sector
|
||||
|
||||
|
||||
@@ -217,192 +215,10 @@ def GetFile80hPattern(file_path):
|
||||
try:
|
||||
mft_entry_value = GetFileMftEntry(file_path)
|
||||
StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter)
|
||||
# print(f"文件的相关信息以及80属性内容:")
|
||||
# print(Get80hPattern(StartSector, volume_letter))
|
||||
file80h_pattern = Get80hPattern(StartSector, volume_letter)
|
||||
return file80h_pattern
|
||||
print(Get80hPattern(StartSector, volume_letter))
|
||||
except Exception as e:
|
||||
print(f"❌ Error: {e}")
|
||||
return None
|
||||
|
||||
|
||||
# if __name__ == '__main__':
|
||||
# data = GetFile80hPattern(r"Z:\hello.txt")
|
||||
# print(data)
|
||||
|
||||
|
||||
def ExtractSequenceHexValues(file80h_pattern):
|
||||
"""
|
||||
从给定的数据结构中提取所有 sequence 的十六进制字符串,并合并成一个标准列表
|
||||
|
||||
参数:
|
||||
data (list): 包含字典的列表,每个字典有 'sequence' 键
|
||||
|
||||
返回:
|
||||
list: 包含所有 sequence 值的合并列表
|
||||
"""
|
||||
sequence_list = []
|
||||
for entry in file80h_pattern:
|
||||
if 'sequence' in entry:
|
||||
# 将每个十六进制字符串按空格分割,然后合并到结果列表
|
||||
for hex_str in entry['sequence']:
|
||||
# 分割字符串并添加到结果
|
||||
sequence_list.extend(hex_str.split())
|
||||
return sequence_list
|
||||
|
||||
|
||||
def ExportDataRunList(data_run_list):
|
||||
"""
|
||||
将 data_run_list 拆分成多个独立的 Data Run 片段。
|
||||
"""
|
||||
result = []
|
||||
pos = 0
|
||||
while pos < len(data_run_list):
|
||||
current_byte = data_run_list[pos]
|
||||
if current_byte == '00':
|
||||
break
|
||||
try:
|
||||
header = int(current_byte, 16)
|
||||
len_bytes = (header >> 4) & 0x0F
|
||||
offset_bytes = header & 0x0F
|
||||
|
||||
run_length = 1 + offset_bytes + len_bytes
|
||||
if pos + run_length > len(data_run_list):
|
||||
print(f"⚠️ 数据越界,停止解析")
|
||||
break
|
||||
|
||||
fragment = data_run_list[pos: pos + run_length]
|
||||
result.append(fragment)
|
||||
pos += run_length
|
||||
except Exception as e:
|
||||
print(f"❌ 解析 Data Run 失败:位置 {pos}, 错误: {e}")
|
||||
pos += 1 # 跳过一个字节继续解析
|
||||
return result
|
||||
|
||||
|
||||
def hex_list_to_int(lst, byteorder='little'):
|
||||
"""
|
||||
将十六进制字符串列表转换为整数(支持小端序)
|
||||
"""
|
||||
if byteorder == 'little':
|
||||
lst = list(reversed(lst))
|
||||
return int(''.join(f"{int(b, 16):02x}" for b in lst), 16)
|
||||
|
||||
|
||||
def parse_data_run(data_run, previous_cluster=0, cluster_size=512):
|
||||
"""
|
||||
解析 NTFS 单个 Data Run,返回起始字节、结束字节、长度(字节)
|
||||
|
||||
参数:
|
||||
data_run (list): Data Run 的十六进制字符串列表
|
||||
previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移)
|
||||
cluster_size (int): 簇大小(默认为 512 字节)
|
||||
|
||||
返回:
|
||||
dict: 包含起始字节、结束字节、长度等信息
|
||||
"""
|
||||
if not data_run or data_run[0] == '00':
|
||||
return None
|
||||
|
||||
header = int(data_run[0], 16)
|
||||
len_bytes = (header >> 4) & 0x0F
|
||||
offset_bytes = header & 0x0F
|
||||
|
||||
if len(data_run) < 1 + offset_bytes + len_bytes:
|
||||
print(f"⚠️ 数据长度不足,无法解析 Data Run")
|
||||
return None
|
||||
|
||||
# 提取偏移字段和长度字段
|
||||
offset_data = data_run[1:1 + offset_bytes]
|
||||
length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes]
|
||||
|
||||
# 小端序转整数
|
||||
def hex_list_to_int(lst):
|
||||
return int(''.join(f"{int(b, 16):02x}" for b in reversed(lst)), 16)
|
||||
|
||||
offset = hex_list_to_int(offset_data)
|
||||
run_length = hex_list_to_int(length_data)
|
||||
|
||||
# 计算起始簇号
|
||||
starting_cluster = previous_cluster + offset
|
||||
ending_cluster = starting_cluster + run_length - 1
|
||||
|
||||
# 转换为字节偏移
|
||||
cluster_per_sector = 8
|
||||
byte_per_sector = cluster_size
|
||||
byte_length = starting_cluster * cluster_per_sector * byte_per_sector
|
||||
starting_byte = run_length * cluster_per_sector * byte_per_sector
|
||||
ending_byte = starting_byte + byte_length - 1
|
||||
|
||||
return {
|
||||
"starting_byte": starting_byte,
|
||||
"ending_byte": ending_byte,
|
||||
"byte_length": byte_length,
|
||||
"starting_cluster": starting_cluster,
|
||||
"run_length_clusters": run_length
|
||||
}
|
||||
|
||||
|
||||
def ParseMultipleDataRuns(fragments, cluster_size=512):
|
||||
"""
|
||||
批量解析多个 Data Run 片段,返回字节偏移信息。
|
||||
|
||||
参数:
|
||||
fragments (list): 多个 Data Run 字符串列表
|
||||
cluster_size (int): 簇大小(默认为 512)
|
||||
|
||||
返回:
|
||||
list: 每个元素是一个包含字节偏移信息的 dict
|
||||
"""
|
||||
results = []
|
||||
previous_starting_cluster = 0
|
||||
|
||||
for fragment in fragments:
|
||||
result = parse_data_run(fragment, previous_starting_cluster, cluster_size)
|
||||
|
||||
if result:
|
||||
results.append(result)
|
||||
previous_starting_cluster = result["starting_cluster"]
|
||||
|
||||
return results
|
||||
|
||||
|
||||
def GetFragmentData(file80h_pattern):
|
||||
if not file80h_pattern or not isinstance(file80h_pattern, list):
|
||||
return []
|
||||
|
||||
if file80h_pattern[0].get('is_resident'):
|
||||
start_byte = file80h_pattern[0].get('start_byte')
|
||||
offset = file80h_pattern[0].get('offset')
|
||||
content_start = file80h_pattern[0].get('sequence')[2]
|
||||
|
||||
content_start_list = content_start.split()
|
||||
content_len = content_start_list[::-1][4:8]
|
||||
content_offset = content_start_list[::-1][:4]
|
||||
|
||||
content_len_str = ''.join(content_len)
|
||||
content_len_decimal_value = int(content_len_str, 16)
|
||||
content_offset_str = ''.join(content_offset)
|
||||
content_offset_decimal_value = int(content_offset_str, 16)
|
||||
|
||||
file_offset = start_byte + offset + content_offset_decimal_value
|
||||
|
||||
return [{
|
||||
'starting_byte': file_offset,
|
||||
'byte_length': content_len_decimal_value
|
||||
}]
|
||||
|
||||
else:
|
||||
sequence_list = ExtractSequenceHexValues(file80h_pattern)
|
||||
data_run_offset = sequence_list[32:34][::-1]
|
||||
data_run_offset_str = ''.join(data_run_offset)
|
||||
data_run_offset_decimal_value = int(data_run_offset_str, 16)
|
||||
data_run_list = sequence_list[data_run_offset_decimal_value:]
|
||||
fragments = ExportDataRunList(data_run_list)
|
||||
results = ParseMultipleDataRuns(fragments)
|
||||
return results
|
||||
|
||||
# if __name__ == '__main__':
|
||||
# arri80_data = GetFile80hPattern(r"Z:\hello.txt")
|
||||
# data = GetFragmentData(arri80_data)
|
||||
# print(data)
|
||||
if __name__ == '__main__':
|
||||
GetFile80hPattern(r"Z:\demo.jpg")
|
||||
|
@@ -6,4 +6,5 @@ readme = "README.md"
|
||||
requires-python = ">=3.12"
|
||||
dependencies = [
|
||||
"psutil>=7.0.0",
|
||||
"pytsk3>=20250312",
|
||||
]
|
||||
|
@@ -1,139 +0,0 @@
|
||||
def extract_data_run_fragments(data_run):
|
||||
"""
|
||||
将 data_run 中的多个 Data Run 提取为独立的 list 片段。
|
||||
|
||||
参数:
|
||||
data_run (list): 十六进制字符串组成的列表,表示 Data Run 内容
|
||||
|
||||
返回:
|
||||
list: 每个元素是一个代表单个 Data Run 的 list
|
||||
"""
|
||||
result = []
|
||||
pos = 0
|
||||
|
||||
while pos < len(data_run):
|
||||
current_byte = data_run[pos]
|
||||
|
||||
if current_byte == '00':
|
||||
# 遇到空运行块,停止解析
|
||||
break
|
||||
|
||||
try:
|
||||
header = int(current_byte, 16)
|
||||
len_bytes = (header >> 4) & 0x0F
|
||||
offset_bytes = header & 0x0F
|
||||
|
||||
if len_bytes == 0 or offset_bytes == 0:
|
||||
print(f"⚠️ 无效的字段长度,跳过位置 {pos}")
|
||||
break
|
||||
|
||||
# 计算当前 Data Run 总长度
|
||||
run_length = 1 + offset_bytes + len_bytes
|
||||
|
||||
# 截取当前 Data Run
|
||||
fragment = data_run[pos: pos + run_length]
|
||||
|
||||
result.append(fragment)
|
||||
|
||||
# 移动指针
|
||||
pos += run_length
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 解析失败,位置 {pos}:{e}")
|
||||
break
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def hex_list_to_int(lst, byteorder='little'):
|
||||
"""
|
||||
将十六进制字符串列表转换为整数(支持小端序)
|
||||
"""
|
||||
if byteorder == 'little':
|
||||
lst = list(reversed(lst))
|
||||
return int(''.join(f"{int(b, 16):02x}" for b in lst), 16)
|
||||
|
||||
|
||||
def parse_data_run(data_run, previous_cluster=0):
|
||||
"""
|
||||
解析 NTFS 单个 Data Run,返回起始簇号和结束簇号
|
||||
|
||||
参数:
|
||||
data_run (list): Data Run 的十六进制字符串列表
|
||||
previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移)
|
||||
|
||||
返回:
|
||||
dict: 包含起始簇、结束簇、运行长度等信息
|
||||
"""
|
||||
if not data_run or data_run[0] == '00':
|
||||
return None
|
||||
|
||||
header = int(data_run[0], 16)
|
||||
len_bytes = (header >> 4) & 0x0F
|
||||
offset_bytes = header & 0x0F
|
||||
|
||||
# 提取偏移字段和长度字段(注意顺序是先偏移后长度)
|
||||
offset_data = data_run[1:1 + offset_bytes]
|
||||
length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes]
|
||||
|
||||
# 解析偏移和长度
|
||||
offset = hex_list_to_int(offset_data, 'little')
|
||||
run_length = hex_list_to_int(length_data, 'little')
|
||||
|
||||
# 计算起始簇号(如果是第一个就是绝对偏移,否则是相对偏移)
|
||||
starting_cluster = previous_cluster + offset
|
||||
ending_cluster = starting_cluster + run_length - 1
|
||||
|
||||
return {
|
||||
"starting_cluster": starting_cluster,
|
||||
"ending_cluster": ending_cluster,
|
||||
"run_length": run_length
|
||||
}
|
||||
|
||||
|
||||
def parse_multiple_data_runs(fragments):
|
||||
"""
|
||||
批量解析多个 Data Run 片段,支持相对偏移。
|
||||
|
||||
参数:
|
||||
fragments (list): 多个 Data Run 字符串列表,如:
|
||||
[
|
||||
['31', '7a', '00', 'ee', '0b'],
|
||||
['22', '29', '06', 'bb', '00'],
|
||||
...
|
||||
]
|
||||
|
||||
返回:
|
||||
list: 每个元素是一个 dict,包含该片段的解析结果
|
||||
"""
|
||||
results = []
|
||||
previous_starting_cluster = 0
|
||||
|
||||
for fragment in fragments:
|
||||
result = parse_data_run(fragment, previous_starting_cluster)
|
||||
|
||||
if result:
|
||||
results.append(result)
|
||||
previous_starting_cluster = result["starting_cluster"]
|
||||
|
||||
return results
|
||||
|
||||
|
||||
data_run = [
|
||||
'31', '7a', '00', 'ee', '0b',
|
||||
'22', '29', '06', 'bb', '00',
|
||||
'32', '7a', '02', 'ee', '00', '00',
|
||||
'00', 'a0', 'f8', 'ff', 'ff', 'ff', 'ff', 'ff'
|
||||
]
|
||||
|
||||
# Step 1: 提取所有有效片段
|
||||
fragments = extract_data_run_fragments(data_run)
|
||||
print("提取到的片段:")
|
||||
for i, frag in enumerate(fragments):
|
||||
print(f"片段{i + 1}: {frag}")
|
||||
|
||||
# Step 2: 批量解析这些片段
|
||||
results = parse_multiple_data_runs(fragments)
|
||||
print("\n解析结果:")
|
||||
for i, res in enumerate(results):
|
||||
print(f"片段{i + 1}: {res}")
|
@@ -1,36 +0,0 @@
|
||||
from files_save import CopyMultiFragmentFiles, CopySingleFragmentFiles
|
||||
|
||||
target_path = r"Z:\Recovered"
|
||||
# 存储各个文件的分片内容列表
|
||||
fragment_lists = {}
|
||||
test_file_sort = [{'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'filename': 'AGA - 一.mp3', 'extent_count': 2,
|
||||
'start_byte': 23162880, 'length': 69632, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3',
|
||||
'filename': 'Aaron Zigman - Main Title.mp3', 'extent_count': 1, 'start_byte': 687685632,
|
||||
'length': 7163904, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\AGA - MIZU.mp3', 'filename': 'AGA - MIZU.mp3', 'extent_count': 1,
|
||||
'start_byte': 694849536, 'length': 8126464, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\AGA - 一.mp3', 'filename': 'AGA - 一.mp3', 'extent_count': 2,
|
||||
'start_byte': 702976000, 'length': 10870784, 'fragment_index': 2},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\Anson Seabra - Keep Your Head Up Princess.mp3',
|
||||
'filename': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'extent_count': 1,
|
||||
'start_byte': 713846784, 'length': 7970816, 'fragment_index': 1}, {
|
||||
'absolute_path': 'Y:\\CloudMusic\\Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
|
||||
'filename': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
|
||||
'extent_count': 1, 'start_byte': 721817600, 'length': 9179136, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\Ava Max - Sweet but Psycho.mp3',
|
||||
'filename': 'Ava Max - Sweet but Psycho.mp3', 'extent_count': 1, 'start_byte': 731000832,
|
||||
'length': 7938048, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3',
|
||||
'filename': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'extent_count': 1,
|
||||
'start_byte': 738938880, 'length': 6791168, 'fragment_index': 1},
|
||||
{'absolute_path': 'Y:\\CloudMusic\\Color Music Choir - Something Just Like This (Live).mp3',
|
||||
'filename': 'Color Music Choir - Something Just Like This (Live).mp3', 'extent_count': 1,
|
||||
'start_byte': 745730048, 'length': 6193152, 'fragment_index': 1}]
|
||||
|
||||
for item in test_file_sort:
|
||||
extent_count = item['extent_count']
|
||||
if extent_count == 1:
|
||||
CopySingleFragmentFiles(item, target_path)
|
||||
elif extent_count > 1:
|
||||
CopyMultiFragmentFiles(item, fragment_lists, target_path)
|
@@ -1,160 +0,0 @@
|
||||
import os
|
||||
|
||||
|
||||
def ExtractVolumeLetter(path: str) -> str:
|
||||
"""从绝对路径中提取盘符"""
|
||||
drive = os.path.splitdrive(path)[0]
|
||||
if not drive:
|
||||
raise ValueError(f"无法从路径中提取盘符:{path}")
|
||||
return drive[0].upper() # 返回 'Y'
|
||||
|
||||
|
||||
def CopySingleFragmentFiles(source_data_dict, target_path):
|
||||
"""
|
||||
根据起始字节和长度,从磁盘中读取数据并保存为目标文件
|
||||
|
||||
:param source_data_dict: 包含源数据信息的字典
|
||||
:param target_path: 目标文件夹路径
|
||||
"""
|
||||
start_byte = source_data_dict.get("start_byte")
|
||||
byte_length = source_data_dict.get("length")
|
||||
absolute_path = source_data_dict.get("absolute_path")
|
||||
file_name = source_data_dict.get("filename")
|
||||
|
||||
if byte_length <= 0:
|
||||
print("错误:字节长度无效")
|
||||
return
|
||||
|
||||
if not absolute_path or not file_name:
|
||||
print("错误:缺少必要的文件信息")
|
||||
return
|
||||
|
||||
source_disk_path = ExtractVolumeLetter(absolute_path)
|
||||
target_file_path = os.path.join(target_path, file_name)
|
||||
|
||||
try:
|
||||
# 创建目标目录(如果不存在)
|
||||
os.makedirs(target_path, exist_ok=True)
|
||||
|
||||
with open(fr"\\.\{source_disk_path}:", 'rb') as disk:
|
||||
disk.seek(start_byte)
|
||||
|
||||
with open(target_file_path, 'wb') as f:
|
||||
remaining = byte_length
|
||||
CHUNK_SIZE = 1024 * 1024 # 1MB
|
||||
while remaining > 0:
|
||||
read_size = min(CHUNK_SIZE, remaining)
|
||||
chunk = disk.read(read_size)
|
||||
if not chunk:
|
||||
print("警告:读取到空数据,可能已到达磁盘末尾。")
|
||||
break
|
||||
f.write(chunk)
|
||||
remaining -= len(chunk)
|
||||
|
||||
print(
|
||||
f"成功:已从字节偏移量 {start_byte} 读取 {byte_length} 字节,保存为 {target_file_path}")
|
||||
|
||||
except PermissionError:
|
||||
print("错误:需要管理员权限访问磁盘设备,请以管理员身份运行此程序")
|
||||
except Exception as e:
|
||||
print(f"发生错误: {str(e)}")
|
||||
|
||||
|
||||
def ReadDiskBytes(volume_letter: str, start_byte: int, length: int) -> bytes:
|
||||
"""
|
||||
从指定磁盘的指定起始位置读取指定长度的字节。
|
||||
|
||||
:param volume_letter: 盘符(如 "Y")
|
||||
:param start_byte: 起始字节位置(整数)
|
||||
:param length: 要读取的字节数(整数)
|
||||
:return: 读取到的原始字节数据(bytes)
|
||||
"""
|
||||
if not isinstance(volume_letter, str) or len(volume_letter.strip()) != 1:
|
||||
raise ValueError("drive_letter 必须是单个字母,如 'Y'")
|
||||
|
||||
# 构建 Windows 设备路径格式:\\.\Y:
|
||||
disk_path = f"\\\\.\\{volume_letter.strip().upper()}:"
|
||||
|
||||
try:
|
||||
with open(disk_path, "rb") as disk:
|
||||
disk.seek(start_byte)
|
||||
data = disk.read(length)
|
||||
return data
|
||||
except PermissionError:
|
||||
raise PermissionError("权限不足,请以管理员身份运行程序")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"读取磁盘失败:{e}")
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# drive = "Y"
|
||||
# start = 687685632
|
||||
# size = 7163904
|
||||
#
|
||||
# try:
|
||||
# content = ReadDiskBytes(drive, start, size)
|
||||
# print(f"成功读取 {len(content)} 字节内容。前100字节为:")
|
||||
# print(content[:100])
|
||||
# except Exception as e:
|
||||
# print("错误:", e)
|
||||
|
||||
|
||||
def CopyMultiFragmentFiles(
|
||||
item: dict,
|
||||
fragment_lists: dict,
|
||||
target_path: str
|
||||
):
|
||||
"""
|
||||
处理具有多个分片的文件,读取并按顺序拼接内容,最终写入磁盘。
|
||||
|
||||
:param item: 包含文件分片信息的字典
|
||||
:param fragment_lists: 存储各文件分片内容的字典
|
||||
:param target_path: 恢复文件的目标保存路径
|
||||
:return: None
|
||||
"""
|
||||
file_name = item['filename']
|
||||
extent_count = item['extent_count']
|
||||
fragment_index = item['fragment_index']
|
||||
start_byte = item['start_byte']
|
||||
length_byte = item['length']
|
||||
|
||||
volume_letter = ExtractVolumeLetter(item['absolute_path'])
|
||||
|
||||
# 读取分片内容
|
||||
fragment_content = ReadDiskBytes(volume_letter, start_byte, length_byte)
|
||||
|
||||
# 如果还没有为这个文件创建列表,则初始化
|
||||
if file_name not in fragment_lists:
|
||||
fragment_lists[file_name] = [None] * extent_count
|
||||
|
||||
# 将内容插入到指定位置
|
||||
if fragment_index <= extent_count:
|
||||
fragment_lists[file_name][fragment_index - 1] = fragment_content
|
||||
print(f"已写入 {file_name} 的第 {fragment_index} 个片段。")
|
||||
else:
|
||||
print(f"警告:{file_name} 的 fragment_index 超出范围:{fragment_index} / {extent_count}")
|
||||
|
||||
# 检查是否所有分片都已加载
|
||||
fragments = fragment_lists[file_name]
|
||||
if None not in fragments:
|
||||
full_content = b''.join(fragments)
|
||||
target_file_path = os.path.join(target_path, file_name)
|
||||
try:
|
||||
with open(target_file_path, 'wb') as f:
|
||||
f.write(full_content)
|
||||
print(f"已成功恢复文件:{file_name}")
|
||||
except Exception as e:
|
||||
print(f"写入文件失败:{file_name},错误:{e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_dict = {
|
||||
'absolute_path': 'Y:\\CloudMusic\\Aaron Zigman - Main Title.mp3',
|
||||
'filename': 'Aaron Zigman - Main Title.mp3',
|
||||
'extent_count': 1,
|
||||
'start_byte': 687685632,
|
||||
'length': 7163904,
|
||||
'fragment_index': 1
|
||||
}
|
||||
|
||||
CopySingleFragmentFiles(test_dict, target_path=r"Z:\RecoveredFiles")
|
@@ -1,232 +0,0 @@
|
||||
import sqlite3
|
||||
|
||||
|
||||
def GetFilesDBPathInfo(db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path",
|
||||
files_path=None) -> list:
|
||||
"""
|
||||
根据传入的文件路径列表,在指定表中查询对应记录的 ID 和 Name 字段。
|
||||
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 要查询的数据表名称
|
||||
:param files_path: 文件的完整路径列表
|
||||
:return: 查询结果列表,每项为 {'absolute_path': str, 'id': int, 'name': str}
|
||||
"""
|
||||
if files_path is None:
|
||||
file_paths = []
|
||||
results = []
|
||||
|
||||
# 连接数据库
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
for path in files_path:
|
||||
try:
|
||||
# 使用字符串格式化插入表名,参数化查询只适用于值
|
||||
sql = f"SELECT ID, Name FROM {table_name} WHERE Path = ?"
|
||||
cursor.execute(sql, (path,))
|
||||
row = cursor.fetchone()
|
||||
if row:
|
||||
results.append({
|
||||
'absolute_path': path,
|
||||
'id': row[0],
|
||||
'name': row[1]
|
||||
})
|
||||
else:
|
||||
print(f"未找到匹配记录:{path}")
|
||||
except Exception as e:
|
||||
print(f"查询失败:{path},错误:{e}")
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# test_files = [
|
||||
# r"CloudMusic/AGA - MIZU.mp3",
|
||||
# r"CloudMusic/AGA - 一.mp3",
|
||||
# r"CloudMusic/Aaron Zigman - Main Title.mp3",
|
||||
# r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
|
||||
# r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
|
||||
# r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
|
||||
# r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
|
||||
# r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
|
||||
# ]
|
||||
#
|
||||
# result = GetFilesDBPathInfo(files_path=test_files)
|
||||
# for item in result:
|
||||
# print(item)
|
||||
|
||||
|
||||
def GetFilesDBNodeInfo(db_path: str = "../src/db_ntfs_info.db", table_name: str = "db_node",
|
||||
path_records: list = None) -> list:
|
||||
"""
|
||||
根据 db_path 查询结果中的 ID 去 db_node 表中查找对应的 extent 分片信息。
|
||||
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: db_node 表名
|
||||
:param path_records: 来自 get_db_path_info 的结果列表
|
||||
:return: 包含文件分片信息的结果列表
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
results = []
|
||||
|
||||
for record in path_records:
|
||||
path_id = record['id']
|
||||
absolute_path = record['absolute_path']
|
||||
name = record['name']
|
||||
|
||||
try:
|
||||
# 查询 db_node 表中 PathID 对应的记录
|
||||
cursor.execute(f"SELECT * FROM {table_name} WHERE PathID = ?", (path_id,))
|
||||
row = cursor.fetchone()
|
||||
|
||||
if not row:
|
||||
print(f"未找到 PathID={path_id} 在表 {table_name} 中的记录")
|
||||
continue
|
||||
|
||||
# 获取字段索引(适用于按列名获取)
|
||||
columns = [desc[0] for desc in cursor.description]
|
||||
|
||||
# 构建字典以便按列名访问
|
||||
node_data = dict(zip(columns, row))
|
||||
|
||||
# 获取 ExtentCount
|
||||
extent_count = node_data.get("ExtentCount", 0)
|
||||
|
||||
# 解析分片信息
|
||||
fragments = []
|
||||
for i in range(1, 5): # extent1 ~ extent4
|
||||
loc = node_data.get(f"extent{i}_Location")
|
||||
length = node_data.get(f"extent{i}_Length")
|
||||
|
||||
if loc is not None and length is not None and length > 0:
|
||||
fragments.append({
|
||||
"start_byte": loc,
|
||||
"length": length
|
||||
})
|
||||
|
||||
results.append({
|
||||
"absolute_path": absolute_path,
|
||||
"name": name,
|
||||
"path_id": path_id,
|
||||
"extent_count": extent_count,
|
||||
"fragments": fragments
|
||||
})
|
||||
|
||||
except Exception as e:
|
||||
print(f"查询失败:PathID={path_id}, 错误:{e}")
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_files = [
|
||||
r"CloudMusic/AGA - MIZU.mp3",
|
||||
r"CloudMusic/AGA - 一.mp3",
|
||||
r"CloudMusic/Aaron Zigman - Main Title.mp3",
|
||||
r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
|
||||
r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
|
||||
r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
|
||||
r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
|
||||
r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
|
||||
]
|
||||
|
||||
# 第一步:获取 db_path 表中的 ID 和 Name
|
||||
path_info = GetFilesDBPathInfo(files_path=test_files)
|
||||
|
||||
# 第二步:根据 PathID 查询 db_node 表中的分片信息
|
||||
file_extents_info = GetFilesDBNodeInfo(path_records=path_info)
|
||||
|
||||
# 打印结果
|
||||
for item in file_extents_info:
|
||||
print(item)
|
||||
|
||||
|
||||
def sort_fragments_by_start_byte(file_extents_list: list) -> list:
|
||||
"""
|
||||
对所有文件的分片按 start_byte 进行排序,并标注是第几个分片。
|
||||
|
||||
:param file_extents_list: get_file_extents_info 返回的结果列表
|
||||
:return: 按 start_byte 排序后的片段列表,包含文件路径、文件名、第几个分片等信息
|
||||
"""
|
||||
all_fragments = []
|
||||
|
||||
for file_info in file_extents_list:
|
||||
absolute_path = file_info['absolute_path']
|
||||
filename = file_info['name']
|
||||
extent_count = file_info['extent_count']
|
||||
fragments = file_info['fragments']
|
||||
|
||||
# 对当前文件的片段排序(虽然通常已经是有序的)
|
||||
sorted_fragments = sorted(fragments, key=lambda x: x['start_byte'])
|
||||
|
||||
# 添加片段索引信息
|
||||
for idx, fragment in enumerate(sorted_fragments, start=1):
|
||||
all_fragments.append({
|
||||
'absolute_path': absolute_path,
|
||||
'filename': filename,
|
||||
'extent_count': extent_count,
|
||||
'start_byte': fragment['start_byte'],
|
||||
'length': fragment['length'],
|
||||
'fragment_index': idx
|
||||
})
|
||||
|
||||
# 全局排序:按 start_byte 排序所有片段
|
||||
all_fragments.sort(key=lambda x: x['start_byte'])
|
||||
|
||||
return all_fragments
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# test_files = [
|
||||
# r"CloudMusic/AGA - MIZU.mp3",
|
||||
# r"CloudMusic/AGA - 一.mp3",
|
||||
# r"CloudMusic/Aaron Zigman - Main Title.mp3",
|
||||
# r"CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3",
|
||||
# r"CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3",
|
||||
# r"CloudMusic/Ava Max - Sweet but Psycho.mp3",
|
||||
# r"CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3",
|
||||
# r"CloudMusic/Color Music Choir - Something Just Like This (Live).mp3"
|
||||
# ]
|
||||
# test_files_sort = [
|
||||
# {'absolute_path': 'CloudMusic/AGA - MIZU.mp3', 'name': 'AGA - MIZU.mp3', 'path_id': 6, 'extent_count': 1,
|
||||
# 'fragments': [{'start_byte': 694849536, 'length': 8126464}]},
|
||||
# {'absolute_path': 'CloudMusic/AGA - 一.mp3', 'name': 'AGA - 一.mp3', 'path_id': 7, 'extent_count': 2,
|
||||
# 'fragments': [{'start_byte': 702976000, 'length': 10870784}, {'start_byte': 23162880, 'length': 69632}]},
|
||||
# {'absolute_path': 'CloudMusic/Aaron Zigman - Main Title.mp3', 'name': 'Aaron Zigman - Main Title.mp3',
|
||||
# 'path_id': 5, 'extent_count': 1, 'fragments': [{'start_byte': 687685632, 'length': 7163904}]},
|
||||
# {'absolute_path': 'CloudMusic/Anson Seabra - Keep Your Head Up Princess.mp3',
|
||||
# 'name': 'Anson Seabra - Keep Your Head Up Princess.mp3', 'path_id': 8, 'extent_count': 1,
|
||||
# 'fragments': [{'start_byte': 713846784, 'length': 7970816}]},
|
||||
# {'absolute_path': 'CloudMusic/Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3',
|
||||
# 'name': 'Anthony Keyrouz,Romy Wave - Something Just Like This (feat. Romy Wave).mp3', 'path_id': 9,
|
||||
# 'extent_count': 1, 'fragments': [{'start_byte': 721817600, 'length': 9179136}]},
|
||||
# {'absolute_path': 'CloudMusic/Ava Max - Sweet but Psycho.mp3', 'name': 'Ava Max - Sweet but Psycho.mp3',
|
||||
# 'path_id': 10, 'extent_count': 1, 'fragments': [{'start_byte': 731000832, 'length': 7938048}]},
|
||||
# {'absolute_path': 'CloudMusic/Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3',
|
||||
# 'name': 'Cecilia Cheung - Turn Into Fireworks and Fall for You.mp3', 'path_id': 11, 'extent_count': 1,
|
||||
# 'fragments': [{'start_byte': 738938880, 'length': 6791168}]},
|
||||
# {'absolute_path': 'CloudMusic/Color Music Choir - Something Just Like This (Live).mp3',
|
||||
# 'name': 'Color Music Choir - Something Just Like This (Live).mp3', 'path_id': 12, 'extent_count': 1,
|
||||
# 'fragments': [{'start_byte': 745730048, 'length': 6193152}]}]
|
||||
#
|
||||
# path_info = GetFilesDBPathInfo(files_path=test_files)
|
||||
# file_extents_data = GetFilesDBNodeInfo(path_records=path_info)
|
||||
#
|
||||
# # 根据文件片段先后排序
|
||||
# single_fragment_result = sort_fragments_by_start_byte(file_extents_data)
|
||||
#
|
||||
# # 模拟多文件片段,根据文件片段先后排序
|
||||
# multi_fragment_result = sort_fragments_by_start_byte(test_files_sort)
|
||||
#
|
||||
# print("单文件片段排序结果:")
|
||||
# for item in single_fragment_result:
|
||||
# print(item)
|
||||
#
|
||||
# print("\n多文件片段排序结果:")
|
||||
# for item in multi_fragment_result:
|
||||
# print(item)
|
@@ -1,199 +0,0 @@
|
||||
import os
|
||||
import sqlite3
|
||||
|
||||
from files_sort import GetFilesDBNodeInfo, sort_fragments_by_start_byte
|
||||
|
||||
|
||||
def GetFolderID(
|
||||
folder_path: str,
|
||||
db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path"
|
||||
) -> int | None:
|
||||
"""
|
||||
根据文件夹路径,查询数据库中该文件夹对应的 ID。
|
||||
|
||||
:param folder_path: 文件夹路径(如 r"CloudMusic\\")
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 要查询的数据表名称,默认为 'db_path'
|
||||
:return: 成功则返回 ID(int),失败返回 None
|
||||
"""
|
||||
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
try:
|
||||
# 使用 table_name 构建 SQL 查询
|
||||
sql = f"SELECT ID FROM {table_name} WHERE Path = ?"
|
||||
cursor.execute(sql, (folder_path,))
|
||||
result = cursor.fetchone()
|
||||
|
||||
if result:
|
||||
return result[0]
|
||||
else:
|
||||
print(f"未找到路径:{folder_path} 在表 {table_name} 中")
|
||||
return None
|
||||
|
||||
except sqlite3.Error as e:
|
||||
print(f"数据库操作失败:{e}")
|
||||
return None
|
||||
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
def GetSubPathsByParentID(
|
||||
parent_id: int,
|
||||
db_path: str = "../src/db_ntfs_info.db",
|
||||
table_name: str = "db_path"
|
||||
) -> list:
|
||||
"""
|
||||
根据 ParentID 查询 db_path 表中对应的子项(文件/文件夹)。
|
||||
|
||||
:param parent_id: 父节点 ID
|
||||
:param db_path: 数据库文件路径
|
||||
:param table_name: 数据表名称
|
||||
:return: 包含 ID、Path、Name 的字典列表
|
||||
"""
|
||||
conn = sqlite3.connect(db_path)
|
||||
cursor = conn.cursor()
|
||||
|
||||
sql = f"""
|
||||
SELECT ID, Path, Name
|
||||
FROM {table_name}
|
||||
WHERE ParentID = ?
|
||||
"""
|
||||
|
||||
try:
|
||||
cursor.execute(sql, (parent_id,))
|
||||
rows = cursor.fetchall()
|
||||
except Exception as e:
|
||||
print(f"数据库查询失败:{e}")
|
||||
return []
|
||||
|
||||
results = []
|
||||
for row in rows:
|
||||
item = {
|
||||
'id': row[0],
|
||||
'absolute_path': row[1],
|
||||
'name': row[2]
|
||||
}
|
||||
results.append(item)
|
||||
|
||||
conn.close()
|
||||
return results
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_folder_path = "pictures/"
|
||||
parent_id_test = GetFolderID(test_folder_path)
|
||||
# node_data = GetNodeFragmentsByParentID(parent_id_test)
|
||||
path_data = GetSubPathsByParentID(parent_id_test)
|
||||
node_data = GetFilesDBNodeInfo(path_records=path_data)
|
||||
for data in node_data:
|
||||
print(data)
|
||||
|
||||
|
||||
def GetSortFragmentsByFolderPath(db_path: str = "../src/db_ntfs_info.db", folder_path: str = None) -> list:
|
||||
"""
|
||||
根据文件夹路径,查询数据库中该文件夹下的所有文件的分片信息。
|
||||
:param db_path: 要查询的数据库
|
||||
:param folder_path: 文件夹的绝对路径
|
||||
:return list: 文件夹下所有文件按片段顺序排列的列表
|
||||
"""
|
||||
parent_id = GetFolderID(folder_path=folder_path, db_path=db_path)
|
||||
path_data = GetSubPathsByParentID(parent_id=parent_id, db_path=db_path)
|
||||
node_data = GetFilesDBNodeInfo(path_records=path_data)
|
||||
result = sort_fragments_by_start_byte(node_data)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# folder_path_test = "pictures/"
|
||||
# data = GetSortFragmentsByFolderPath(db_path="../src/db_ntfs_info.db", folder_path=folder_path_test)
|
||||
# for item in data:
|
||||
# print(item)
|
||||
|
||||
|
||||
def ScanDirectory(root_dir, skip_system=True):
|
||||
"""
|
||||
递归扫描指定目录,返回相对于盘符的路径列表(使用 '/' 分隔),不包含盘符。
|
||||
|
||||
:param root_dir: 要扫描的根目录路径
|
||||
:param skip_system: 是否跳过系统目录(默认 True)
|
||||
:return: 文件路径列表,格式为 relative/path/to/file.ext
|
||||
"""
|
||||
file_list = []
|
||||
|
||||
for root, dirs, files in os.walk(root_dir):
|
||||
# 跳过系统目录
|
||||
if skip_system:
|
||||
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
|
||||
|
||||
for file in files:
|
||||
full_path = os.path.join(root, file)
|
||||
|
||||
# 去掉盘符
|
||||
_, relative_path = os.path.splitdrive(full_path)
|
||||
|
||||
# 替换 \ 为 /
|
||||
relative_path = relative_path.lstrip("\\").replace("\\", "/")
|
||||
|
||||
file_list.append(relative_path)
|
||||
|
||||
return file_list
|
||||
|
||||
|
||||
# if __name__ == "__main__":
|
||||
# folder_path = r"Y:/folder1/"
|
||||
# files_list = ScanDirectory(folder_path)
|
||||
#
|
||||
# print(f"共找到 {len(files_list)} 个文件:")
|
||||
# for f in files_list:
|
||||
# print(f)
|
||||
|
||||
|
||||
def ScanMultiFolders(folder_paths, skip_system=True):
|
||||
"""
|
||||
扫描多个根目录,返回所有子目录中的文件路径列表。
|
||||
|
||||
:param folder_paths: 包含多个根目录的列表
|
||||
:param skip_system: 是否跳过系统目录(默认 True)
|
||||
:return: 所有文件的相对路径列表(格式为 folder/file.ext)
|
||||
"""
|
||||
all_files = []
|
||||
|
||||
for root_dir in folder_paths:
|
||||
# 确保路径存在
|
||||
if not os.path.exists(root_dir):
|
||||
print(f"⚠️ 路径不存在:{root_dir}")
|
||||
continue
|
||||
|
||||
for root, dirs, files in os.walk(root_dir):
|
||||
# 跳过系统目录
|
||||
if skip_system:
|
||||
dirs[:] = [d for d in dirs if not d.startswith('$') and d != "System Volume Information"]
|
||||
|
||||
for file in files:
|
||||
full_path = os.path.join(root, file)
|
||||
|
||||
# 去掉盘符
|
||||
_, relative_path = os.path.splitdrive(full_path)
|
||||
relative_path = relative_path.lstrip("\\").replace("\\", "/")
|
||||
|
||||
all_files.append(relative_path)
|
||||
|
||||
return all_files
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
folders = [
|
||||
r"Y:\CloudMusic",
|
||||
r"Y:\folder1"
|
||||
]
|
||||
|
||||
files = ScanMultiFolders(folders)
|
||||
|
||||
print(f"共找到 {len(files)} 个文件:")
|
||||
for f in files:
|
||||
print(f)
|
@@ -1,92 +0,0 @@
|
||||
def analyze_ntfs_data_attribute(data):
|
||||
"""
|
||||
分析 NTFS 数据结构中的80属性($DATA),返回文件分片数量
|
||||
|
||||
参数:
|
||||
data (list): 包含字典的列表,每个字典需有'sequence'键
|
||||
(示例结构见问题描述)
|
||||
|
||||
返回:
|
||||
int: 分片数量(常驻属性返回1,非常驻属性返回数据运行的分片数)
|
||||
|
||||
异常:
|
||||
ValueError: 当输入数据无效时抛出
|
||||
"""
|
||||
# 第一步:提取并转换sequence数据
|
||||
hex_bytes = []
|
||||
for entry in data:
|
||||
if 'sequence' in entry:
|
||||
for hex_str in entry['sequence']:
|
||||
hex_bytes.extend(hex_str.split())
|
||||
|
||||
print(hex_bytes)
|
||||
|
||||
# 将十六进制字符串转换为整数列表
|
||||
try:
|
||||
attribute_data = [int(x, 16) for x in hex_bytes]
|
||||
except ValueError:
|
||||
raise ValueError("无效的十六进制数据")
|
||||
|
||||
# 第二步:分析属性结构
|
||||
if len(attribute_data) < 24:
|
||||
raise ValueError("属性数据过短,无法解析头部信息")
|
||||
|
||||
# 检查属性类型(0x80)
|
||||
if attribute_data[0] != 0x80:
|
||||
raise ValueError("不是80属性($DATA属性)")
|
||||
|
||||
# 检查是否常驻(偏移0x08)
|
||||
is_resident = attribute_data[8] == 0
|
||||
|
||||
if is_resident:
|
||||
return 1
|
||||
else:
|
||||
# 解析非常驻属性的数据运行列表
|
||||
data_run_offset = attribute_data[0x20] | (attribute_data[0x21] << 8)
|
||||
|
||||
if data_run_offset >= len(attribute_data):
|
||||
raise ValueError("数据运行偏移超出属性长度")
|
||||
|
||||
data_runs = attribute_data[data_run_offset:]
|
||||
fragment_count = 0
|
||||
pos = 0
|
||||
|
||||
while pos < len(data_runs):
|
||||
header_byte = data_runs[pos]
|
||||
if header_byte == 0x00:
|
||||
break
|
||||
|
||||
len_len = (header_byte >> 4) & 0x0F
|
||||
offset_len = header_byte & 0x0F
|
||||
|
||||
if len_len == 0 or offset_len == 0:
|
||||
break
|
||||
|
||||
pos += 1 + len_len + offset_len
|
||||
fragment_count += 1
|
||||
|
||||
return fragment_count
|
||||
|
||||
|
||||
input_data = [
|
||||
{
|
||||
'start_byte': 3221267456,
|
||||
'offset': 264,
|
||||
'sequence': [
|
||||
'80 00 00 00 48 00 00 00',
|
||||
'01 00 00 00 00 00 01 00',
|
||||
'00 00 00 00 00 00 00 00',
|
||||
'79 00 00 00 00 00 00 00',
|
||||
'40 00 00 00 00 00 00 00',
|
||||
'00 a0 07 00 00 00 00 00',
|
||||
'0b 93 07 00 00 00 00 00',
|
||||
'0b 93 07 00 00 00 00 00',
|
||||
'31 7a 00 ee 0b 00 00 00'
|
||||
],
|
||||
'is_resident': False,
|
||||
'total_groups': 9,
|
||||
'attribute_length': 72
|
||||
}
|
||||
]
|
||||
|
||||
print(analyze_ntfs_data_attribute(input_data)) # 输出分片数量
|
@@ -1,105 +0,0 @@
|
||||
def ParseDataRuns(data_bytes: list, cluster_size=512):
|
||||
"""
|
||||
解析 NTFS $80 属性中的数据运行(Data Run),返回每个分片的起始字节数和长度。
|
||||
|
||||
参数:
|
||||
data_bytes (list): 十六进制字符串组成的列表,表示完整的 $80 属性内容。
|
||||
cluster_size (int): 簇大小(默认为 512 字节)
|
||||
|
||||
返回:
|
||||
dict: 包含每个分片信息的字典,格式如下:
|
||||
{
|
||||
"is_resident": False,
|
||||
"data_runs": {
|
||||
"片段1": {"起始字节数": 3202351104, "字节长度": 499712 - 1},
|
||||
"片段2": {...}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
def hex_list_to_int(lst, length, byteorder='little'):
|
||||
"""从列表中提取指定长度的字节并转换为整数"""
|
||||
bytes_data = bytes([int(x, 16) for x in lst[:length]])
|
||||
return int.from_bytes(bytes_data, byteorder=byteorder)
|
||||
|
||||
result = {
|
||||
"is_resident": True,
|
||||
"data_runs": {}
|
||||
}
|
||||
|
||||
# 检查是否是 $80 属性
|
||||
if data_bytes[0] != '80':
|
||||
raise ValueError("不是 $80 属性")
|
||||
|
||||
# 常驻标志在偏移 0x08(第 8 个字节)
|
||||
is_resident = data_bytes[8] == '00'
|
||||
result["is_resident"] = is_resident
|
||||
|
||||
if is_resident:
|
||||
result["data_runs"]["常驻文件"] = {
|
||||
"起始字节数": 0,
|
||||
"字节长度": "该文件为常驻,无分片"
|
||||
}
|
||||
return result
|
||||
|
||||
# 非常驻属性:获取数据运行偏移(偏移 0x20 处的 DWORD)
|
||||
data_run_offset = hex_list_to_int(data_bytes[0x20:0x20 + 4], 4)
|
||||
if data_run_offset >= len(data_bytes):
|
||||
raise ValueError("数据运行偏移超出范围")
|
||||
|
||||
# 提取数据运行部分
|
||||
data_run_bytes = data_bytes[data_run_offset:]
|
||||
pos = 0
|
||||
fragment_index = 1
|
||||
|
||||
while pos < len(data_run_bytes):
|
||||
header_byte = int(data_run_bytes[pos], 16)
|
||||
if header_byte == 0x00:
|
||||
break
|
||||
|
||||
# 高4位:长度字段数量;低4位:偏移字段数量
|
||||
len_len = (header_byte >> 4) & 0x0F
|
||||
offset_len = header_byte & 0x0F
|
||||
|
||||
if len_len == 0 or offset_len == 0:
|
||||
break
|
||||
|
||||
pos += 1
|
||||
|
||||
# 提取偏移量(小端序)
|
||||
offset_bytes = data_run_bytes[pos:pos + offset_len]
|
||||
offset = hex_list_to_int(offset_bytes, offset_len, byteorder='little')
|
||||
|
||||
# 提取长度(小端序)
|
||||
length_bytes = data_run_bytes[pos + offset_len:pos + offset_len + len_len]
|
||||
length = hex_list_to_int(length_bytes, len_len, byteorder='little')
|
||||
|
||||
# 计算起始字节数 = offset * cluster_size
|
||||
start_byte = offset * cluster_size
|
||||
byte_length = length * cluster_size - 1
|
||||
|
||||
result["data_runs"][f"片段{fragment_index}"] = {
|
||||
"起始字节数": start_byte,
|
||||
"字节长度": byte_length
|
||||
}
|
||||
|
||||
pos += offset_len + len_len
|
||||
fragment_index += 1
|
||||
|
||||
return result
|
||||
|
||||
|
||||
input_data = [
|
||||
'80', '00', '00', '00', '48', '00', '00', '00',
|
||||
'01', '00', '00', '00', '00', '00', '01', '00',
|
||||
'00', '00', '00', '00', '00', '00', '00', '00',
|
||||
'79', '00', '00', '00', '00', '00', '00', '00',
|
||||
'40', '00', '00', '00', '00', '00', '00', '00',
|
||||
'00', 'a0', '07', '00', '00', '00', '00', '00',
|
||||
'0b', '93', '07', '00', '00', '00', '00', '00',
|
||||
'0b', '93', '07', '00', '00', '00', '00', '00',
|
||||
'31', '7a', '00', 'ee', '0b', '00', '00', '00'
|
||||
]
|
||||
|
||||
result = ParseDataRuns(input_data)
|
||||
print(result)
|
12
uv.lock
generated
12
uv.lock
generated
@@ -8,10 +8,14 @@ version = "0.1.0"
|
||||
source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "psutil" },
|
||||
{ name = "pytsk3" },
|
||||
]
|
||||
|
||||
[package.metadata]
|
||||
requires-dist = [{ name = "psutil", specifier = ">=7.0.0" }]
|
||||
requires-dist = [
|
||||
{ name = "psutil", specifier = ">=7.0.0" },
|
||||
{ name = "pytsk3", specifier = ">=20250312" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "psutil"
|
||||
@@ -27,3 +31,9 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/50/e6/eecf58810b9d12e6427369784efe814a1eec0f492084ce8eb8f4d89d6d61/psutil-7.0.0-cp37-abi3-win32.whl", hash = "sha256:ba3fcef7523064a6c9da440fc4d6bd07da93ac726b5733c29027d7dc95b39d99", size = 241053, upload-time = "2025-02-13T21:54:34.31Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/50/1b/6921afe68c74868b4c9fa424dad3be35b095e16687989ebbb50ce4fceb7c/psutil-7.0.0-cp37-abi3-win_amd64.whl", hash = "sha256:4cf3d4eb1aa9b348dec30105c55cd9b7d4629285735a102beb4441e38db90553", size = 244885, upload-time = "2025-02-13T21:54:37.486Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "pytsk3"
|
||||
version = "20250312"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/59/3f/2d440547eecca1786c2178a3e010e7fb61da1f0468d9809ff2b5b8fbb39b/pytsk3-20250312.tar.gz", hash = "sha256:bb47d4aa5976adbc8d4350bed719b771c548139bc8efe761e1d081aa99074c1b", size = 5274913, upload-time = "2025-03-12T05:49:14.937Z" }
|
||||
|
Reference in New Issue
Block a user