restore mft_analyze

This commit is contained in:
Burgess Leo
2025-05-19 11:03:36 +08:00
parent e167ff5d9f
commit b2e14fdbe0
3 changed files with 249 additions and 7 deletions

View File

@@ -22,9 +22,13 @@ def GetExtendNameId(name: str, cursor: sqlite3.Cursor) -> int:
# 获取 DirLayer路径层级
def GetDirLayer(path: str) -> int:
# 示例:Z:\demo.jpg → 层级为0Z:\pictures\RHCE.jpg → 层级为1
path = path.strip().strip("\\")
return path.count("\\")
# "Z:\demo.jpg"0 (根目录文件)
# "Z:\pictures\RHCE.jpg" → 1 (一级子目录)
path = path.strip()
if not path or path == "\\":
return 0
# 计算路径中的反斜杠数量,减去根目录的反斜杠
return path.count("\\") - 1
# 获取 GroupID默认第一个
@@ -74,7 +78,7 @@ def GetRandomLength() -> int:
# 主函数:将 db_path 数据导入 db_node
def MigratePathToNode(db_path='../src/db_ntfs_info.db'):
def InsertNodeDataToDB(db_path='../src/db_ntfs_info.db', table_name='db_node'):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
@@ -85,9 +89,18 @@ def MigratePathToNode(db_path='../src/db_ntfs_info.db'):
cursor.execute("SELECT ID, Path, Name, ParentID FROM db_path")
rows = cursor.fetchall()
inserted_count = 0 # 新增:记录实际插入的条目数
for row in rows:
path_id, full_path, name, parent_id = row
# 检查是否已存在相同 PathID
cursor.execute("SELECT COUNT(*) FROM db_node WHERE PathID = ?", (path_id,))
exists = cursor.fetchone()[0]
if exists > 0:
print(f"⚠️ PathID {path_id} 已存在,跳过插入")
continue
# 计算字段
name_hash = hashlib.sha256(name.encode()).hexdigest()
dir_layer = GetDirLayer(full_path)
@@ -136,15 +149,21 @@ def MigratePathToNode(db_path='../src/db_ntfs_info.db'):
# 构建 SQL 插入语句
placeholders = ', '.join('?' * len(values))
insert_sql = f"INSERT INTO db_node ({', '.join(fields)}) VALUES ({placeholders})"
insert_sql = f"INSERT INTO {table_name} ({', '.join(fields)}) VALUES ({placeholders})"
# 执行插入
cursor.execute(insert_sql, values)
inserted_count += 1 # 新增成功插入后计数器加1
conn.commit()
conn.close()
print("✅ db_path 数据已成功迁移到 db_node 表")
# 新增:根据插入结果输出不同信息
if inserted_count > 0:
print(f"✅ 成功插入 {inserted_count} 条数据到 {table_name}")
else:
print(" 没有新的数据被插入数据库(可能所有条目已存在或没有可处理的数据)")
if __name__ == '__main__':
MigratePathToNode()
InsertNodeDataToDB()

223
ntfs_utils/mft_analyze.py Normal file
View File

@@ -0,0 +1,223 @@
import os
import pytsk3
from db_config import GetNTFSBootInfo
def find_file_mft_entry(fs, target_path):
"""
在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号
"""
def traverse_directory(inode, path_components):
if not path_components:
return inode
dir_name = path_components[0].lower()
try:
directory = fs.open_dir(inode=inode)
except Exception as e:
print(f"Error opening directory with inode {inode}: {e}")
return None
for entry in directory:
if not entry.info or not entry.info.name or not entry.info.meta:
continue
name = entry.info.name.name.decode('utf-8', errors='ignore').lower()
meta = entry.info.meta
# 匹配当前层级目录或文件名
if name == dir_name:
if len(path_components) == 1:
# 是目标文件/目录
return meta.addr
elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR:
# 继续深入查找子目录
next_inode = entry.info.meta.addr
result = traverse_directory(next_inode, path_components[1:])
if result:
return result
return None
# 拆分路径
path_parts = target_path.strip("\\").lower().split("\\")
root_inode = fs.info.root_inum # 根目录 MFT Entry
return traverse_directory(root_inode, path_parts)
def GetFileMftEntry(file_path):
"""
获取指定文件在 NTFS 中的 MFT Entry 编号
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# 获取驱动器字母
drive_letter = os.path.splitdrive(file_path)[0][0]
device = f"\\\\.\\{drive_letter}:"
print(f"Opening device: {device}")
try:
img = pytsk3.Img_Info(device)
fs = pytsk3.FS_Info(img)
except Exception as e:
raise RuntimeError(f"Failed to open device '{device}': {e}")
# 构建相对路径
abs_path = os.path.abspath(file_path)
root_path = f"{drive_letter}:\\"
rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\")
print(f"Looking up MFT entry for: {rel_path}")
mft_entry = find_file_mft_entry(fs, rel_path)
if mft_entry is None:
raise RuntimeError("Could not find MFT entry for the specified file.")
return mft_entry
def CalculateFileMftStartSector(mft_entry, volume_letter="Z"):
"""
根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号
参数:
mft_entry (int): 文件的 MFT Entry 编号(即 inode
mft_start_sector (int): $MFT 的起始扇区号,默认 6291456
mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024
bytes_per_sector (int): 每扇区字节数,默认 512
返回:
int: 文件 MFT Entry 的起始扇区号
"""
if mft_entry < 0:
raise ValueError("MFT Entry 编号不能为负数")
# 获取 NTFS 引导信息
config_data = GetNTFSBootInfo(volume_letter)
# 计算文件 MFT Entry 的起始扇区号
start_sector = config_data["MftPosition"] * 8 + mft_entry * 2
return start_sector
def Get80hPattern(sector_number, volume_letter="Z"):
"""
读取NTFS扇区并查找特定模式的数据
参数:
sector_number (int): 要读取的扇区号
drive_path (str): 磁盘设备路径默认为Z盘
返回:
list: 包含所有匹配信息的列表,每个元素为:
{
'start_byte': 文件MFT Entry的起始字节位置StartSector * 512,
'offset': 当前80属性在扇区内的偏移位置,
'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ...",
'is_resident': 是否为常驻属性,
'total_groups': 实际读取的组数,
'attribute_length': 属性总长度(字节)
}
"""
drive_path = fr"\\.\{volume_letter}:"
SECTOR_SIZE = 512
GROUP_SIZE = 8 # 每组8字节
MATCH_BYTE = 0x80 # 要匹配的起始字节
results = []
try:
with open(drive_path, 'rb') as disk:
disk.seek(sector_number * SECTOR_SIZE)
sector_data = disk.read(SECTOR_SIZE)
if not sector_data or len(sector_data) < GROUP_SIZE:
print(f"错误: 无法读取扇区 {sector_number}")
return results
groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)]
for i in range(len(groups)):
current_group = groups[i]
if len(current_group) < GROUP_SIZE:
continue
if current_group[0] == MATCH_BYTE:
# 获取第5~8字节作为属性长度小端DWORD
if i + 1 >= len(groups):
print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h")
continue
attribute_length_bytes = b''.join([
groups[i][4:8], # 第一组的4~7字节
groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00'
])
attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little')
# 计算要读取的组数向上取整到8字节
total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE
end_idx = i + total_groups
if end_idx > len(groups):
print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h")
continue
raw_sequence = groups[i:end_idx]
# 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00"
formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence]
# 判断是否为常驻属性查看第2个组第一个字节最低位
is_resident = False
if len(raw_sequence) >= 2:
second_group = raw_sequence[1]
is_resident = (second_group[0] & 0x01) == 0x00
result_entry = {
'start_byte': sector_number * SECTOR_SIZE, # 新增字段文件MFT Entry的起始字节位置
'offset': i * GROUP_SIZE,
'sequence': formatted_sequence,
'is_resident': is_resident,
'total_groups': total_groups,
'attribute_length': attribute_length
}
results.append(result_entry)
# resident_str = "常驻" if is_resident else "非常驻"
# print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:")
# print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:")
# for j, group in enumerate(formatted_sequence):
# print(f"组 {j + 1}: {group}")
#
# print(f"\n共找到 {len(results)} 个匹配序列")
return results
except PermissionError:
print("错误: 需要管理员权限访问磁盘设备")
except Exception as e:
print(f"发生错误: {str(e)}")
return results
def GetFile80hPattern(file_path):
volume_letter = file_path.split(':')[0]
try:
mft_entry_value = GetFileMftEntry(file_path)
StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter)
print(Get80hPattern(StartSector, volume_letter))
except Exception as e:
print(f"❌ Error: {e}")
if __name__ == '__main__':
GetFile80hPattern(r"Z:\demo.jpg")

Binary file not shown.