Files
fastcopy/ntfs_utils/mft_analyze.py
2025-05-20 16:26:58 +08:00

424 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import pytsk3
from db_config import GetNTFSBootInfo
def find_file_mft_entry(fs, target_path):
"""
在 NTFS 文件系统中根据路径查找文件的 MFT Entry 编号
"""
def traverse_directory(inode, path_components):
if not path_components:
return inode
dir_name = path_components[0].lower()
try:
directory = fs.open_dir(inode=inode)
except Exception as e:
print(f"Error opening directory with inode {inode}: {e}")
return None
for entry in directory:
if not entry.info or not entry.info.name or not entry.info.meta:
continue
name = entry.info.name.name.decode('utf-8', errors='ignore').lower()
meta = entry.info.meta
# 匹配当前层级目录或文件名
if name == dir_name:
if len(path_components) == 1:
# 是目标文件/目录
return meta.addr
elif meta.type == pytsk3.TSK_FS_META_TYPE_DIR:
# 继续深入查找子目录
next_inode = entry.info.meta.addr
result = traverse_directory(next_inode, path_components[1:])
if result:
return result
return None
# 拆分路径
path_parts = target_path.strip("\\").lower().split("\\")
root_inode = fs.info.root_inum # 根目录 MFT Entry
return traverse_directory(root_inode, path_parts)
def GetFileMftEntry(file_path):
"""
获取指定文件在 NTFS 中的 MFT Entry 编号
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# 获取驱动器字母
drive_letter = os.path.splitdrive(file_path)[0][0]
device = f"\\\\.\\{drive_letter}:"
# print(f"Opening device: {device}")
try:
img = pytsk3.Img_Info(device)
fs = pytsk3.FS_Info(img)
except Exception as e:
raise RuntimeError(f"Failed to open device '{device}': {e}")
# 构建相对路径
abs_path = os.path.abspath(file_path)
root_path = f"{drive_letter}:\\"
rel_path = os.path.relpath(abs_path, root_path).replace("/", "\\")
# print(f"Looking up MFT entry for: {rel_path}")
mft_entry = find_file_mft_entry(fs, rel_path)
# print(f"MFT Entry: {mft_entry}")
if mft_entry is None:
raise RuntimeError("Could not find MFT entry for the specified file.")
return mft_entry
def CalculateFileMftStartSector(mft_entry, volume_letter="Z"):
"""
根据 MFT Entry 编号计算该文件 MFT Entry 的起始扇区号
参数:
mft_entry (int): 文件的 MFT Entry 编号(即 inode
mft_start_sector (int): $MFT 的起始扇区号,默认 6291456
mft_entry_size (int): 每个 MFT Entry 的大小(字节),默认 1024
bytes_per_sector (int): 每扇区字节数,默认 512
返回:
int: 文件 MFT Entry 的起始扇区号
"""
if mft_entry < 0:
raise ValueError("MFT Entry 编号不能为负数")
# 获取 NTFS 引导信息
config_data = GetNTFSBootInfo(volume_letter)
# 计算文件 MFT Entry 的起始扇区号
start_sector = config_data["MftPosition"] * 8 + mft_entry * 2
if start_sector < 0:
raise ValueError("起始扇区号不能为负数")
# print(f"文件 MFT Entry 的起始扇区号: {start_sector}")
return start_sector
def Get80hPattern(sector_number, volume_letter="Z"):
"""
读取NTFS扇区并查找特定模式的数据
参数:
sector_number (int): 要读取的扇区号
drive_path (str): 磁盘设备路径默认为Z盘
返回:
list: 包含所有匹配信息的列表,每个元素为:
{
'start_byte': 文件MFT Entry的起始字节位置StartSector * 512,
'offset': 当前80属性在扇区内的偏移位置,
'sequence': 原始数据组列表(每组字符串格式:"xx xx xx ...",
'is_resident': 是否为常驻属性,
'total_groups': 实际读取的组数,
'attribute_length': 属性总长度(字节)
}
"""
drive_path = fr"\\.\{volume_letter}:"
SECTOR_SIZE = 512
GROUP_SIZE = 8 # 每组8字节
MATCH_BYTE = 0x80 # 要匹配的起始字节
results = []
try:
with open(drive_path, 'rb') as disk:
disk.seek(sector_number * SECTOR_SIZE)
sector_data = disk.read(SECTOR_SIZE)
if not sector_data or len(sector_data) < GROUP_SIZE:
print(f"错误: 无法读取扇区 {sector_number}")
return results
groups = [sector_data[i:i + GROUP_SIZE] for i in range(0, len(sector_data), GROUP_SIZE)]
for i in range(len(groups)):
current_group = groups[i]
if len(current_group) < GROUP_SIZE:
continue
if current_group[0] == MATCH_BYTE:
# 获取第5~8字节作为属性长度小端DWORD
if i + 1 >= len(groups):
print(f"警告: 当前组后不足两组,跳过偏移量 {i * GROUP_SIZE:04X}h")
continue
attribute_length_bytes = b''.join([
groups[i][4:8], # 第一组的4~7字节
groups[i + 1][0:4] if i + 1 < len(groups) else b'\x00\x00\x00\x00'
])
attribute_length = int.from_bytes(attribute_length_bytes[:4], byteorder='little')
# 计算要读取的组数向上取整到8字节
total_groups = (attribute_length + GROUP_SIZE - 1) // GROUP_SIZE
end_idx = i + total_groups
if end_idx > len(groups):
print(f"警告: 属性越界,跳过偏移量 {i * GROUP_SIZE:04X}h")
continue
raw_sequence = groups[i:end_idx]
# 将 bytes 转换为字符串格式 "31 7a 00 ee 0b 00 00 00"
formatted_sequence = [' '.join(f"{byte:02x}" for byte in group) for group in raw_sequence]
# 判断是否为常驻属性查看第2个组第一个字节最低位
is_resident = False
if len(raw_sequence) >= 2:
second_group = raw_sequence[1]
is_resident = (second_group[0] & 0x01) == 0x00
result_entry = {
'start_byte': sector_number * SECTOR_SIZE, # 新增字段文件MFT Entry的起始字节位置
'offset': i * GROUP_SIZE,
'sequence': formatted_sequence,
'is_resident': is_resident,
'total_groups': total_groups,
'attribute_length': attribute_length
}
results.append(result_entry)
# resident_str = "常驻" if is_resident else "非常驻"
# print(f"\n在偏移量 {i * GROUP_SIZE:04X}h 处找到{resident_str} 80 属性:")
# print(f"属性总长度: {attribute_length} 字节 -> 需读取 {total_groups} 组数据:")
# for j, group in enumerate(formatted_sequence):
# print(f"组 {j + 1}: {group}")
#
# print(f"\n共找到 {len(results)} 个匹配序列")
return results
except PermissionError:
print("错误: 需要管理员权限访问磁盘设备")
except Exception as e:
print(f"发生错误: {str(e)}")
return results
def GetFile80hPattern(file_path):
volume_letter = file_path.split(':')[0]
try:
mft_entry_value = GetFileMftEntry(file_path)
StartSector = CalculateFileMftStartSector(mft_entry_value, volume_letter)
# print(f"文件的相关信息以及80属性内容")
# print(Get80hPattern(StartSector, volume_letter))
file80h_pattern = Get80hPattern(StartSector, volume_letter)
return file80h_pattern
except Exception as e:
print(f"❌ Error: {e}")
return None
# if __name__ == '__main__':
# data = GetFile80hPattern(r"Z:\hello.txt")
# print(data)
def ExtractSequenceHexValues(file80h_pattern):
"""
从给定的数据结构中提取所有 sequence 的十六进制字符串,并合并成一个标准列表
参数:
data (list): 包含字典的列表,每个字典有 'sequence'
返回:
list: 包含所有 sequence 值的合并列表
"""
sequence_list = []
for entry in file80h_pattern:
if 'sequence' in entry:
# 将每个十六进制字符串按空格分割,然后合并到结果列表
for hex_str in entry['sequence']:
# 分割字符串并添加到结果
sequence_list.extend(hex_str.split())
return sequence_list
def ExportDataRunList(data_run):
"""
将 data_run 中的多个 Data Run 提取为独立的 list 片段。
参数:
data_run (list): 十六进制字符串组成的列表,表示 Data Run 内容
返回:
list: 每个元素是一个代表单个 Data Run 的 list
"""
result = []
pos = 0
while pos < len(data_run):
current_byte = data_run[pos]
if current_byte == '00':
# 遇到空运行块,停止解析
break
try:
header = int(current_byte, 16)
len_bytes = (header >> 4) & 0x0F
offset_bytes = header & 0x0F
if len_bytes == 0 or offset_bytes == 0:
print(f"⚠️ 无效的字段长度,跳过位置 {pos}")
break
# 计算当前 Data Run 总长度
run_length = 1 + offset_bytes + len_bytes
# 截取当前 Data Run
fragment = data_run[pos: pos + run_length]
result.append(fragment)
# 移动指针
pos += run_length
except Exception as e:
print(f"❌ 解析失败,位置 {pos}{e}")
break
return result
def hex_list_to_int(lst, byteorder='little'):
"""
将十六进制字符串列表转换为整数(支持小端序)
"""
if byteorder == 'little':
lst = list(reversed(lst))
return int(''.join(f"{int(b, 16):02x}" for b in lst), 16)
def parse_data_run(data_run, previous_cluster=0, cluster_size=512):
"""
解析 NTFS 单个 Data Run返回起始字节、结束字节、长度字节
参数:
data_run (list): Data Run 的十六进制字符串列表
previous_cluster (int): 上一个运行块的最后一个簇号(用于相对偏移)
cluster_size (int): 簇大小(默认为 512 字节)
返回:
dict: 包含起始字节、结束字节、长度等信息
"""
if not data_run or data_run[0] == '00':
return None
header = int(data_run[0], 16)
len_bytes = (header >> 4) & 0x0F
offset_bytes = header & 0x0F
# 提取偏移字段和长度字段
offset_data = data_run[1:1 + offset_bytes]
length_data = data_run[1 + offset_bytes:1 + offset_bytes + len_bytes]
# 小端序转整数
def hex_list_to_int(lst):
return int(''.join(f"{int(b, 16):02x}" for b in reversed(lst)), 16)
offset = hex_list_to_int(offset_data)
run_length = hex_list_to_int(length_data)
# 计算起始簇号
starting_cluster = previous_cluster + offset
ending_cluster = starting_cluster + run_length - 1
# 转换为字节偏移
cluster_per_sector = 8
byte_per_sector = cluster_size
byte_length = starting_cluster * cluster_per_sector * byte_per_sector
starting_byte = run_length * cluster_per_sector * byte_per_sector
ending_byte = starting_byte + byte_length - 1
return {
"starting_byte": starting_byte,
"ending_byte": ending_byte,
"byte_length": byte_length,
"starting_cluster": starting_cluster,
"run_length_clusters": run_length
}
def ParseMultipleDataRuns(fragments, cluster_size=512):
"""
批量解析多个 Data Run 片段,返回字节偏移信息。
参数:
fragments (list): 多个 Data Run 字符串列表
cluster_size (int): 簇大小(默认为 512
返回:
list: 每个元素是一个包含字节偏移信息的 dict
"""
results = []
previous_starting_cluster = 0
for fragment in fragments:
result = parse_data_run(fragment, previous_starting_cluster, cluster_size)
if result:
results.append(result)
previous_starting_cluster = result["starting_cluster"]
return results
def GetFragmentData(file80h_pattern):
if not file80h_pattern or not isinstance(file80h_pattern, list):
return []
if file80h_pattern[0].get('is_resident'):
start_byte = file80h_pattern[0].get('start_byte')
offset = file80h_pattern[0].get('offset')
content_start = file80h_pattern[0].get('sequence')[2]
content_start_list = content_start.split()
content_len = content_start_list[::-1][4:8]
content_offset = content_start_list[::-1][:4]
content_len_str = ''.join(content_len)
content_len_decimal_value = int(content_len_str, 16)
content_offset_str = ''.join(content_offset)
content_offset_decimal_value = int(content_offset_str, 16)
file_offset = start_byte + offset + content_offset_decimal_value
return [{
'starting_byte': file_offset,
'byte_length': content_len_decimal_value
}]
else:
sequence_list = ExtractSequenceHexValues(file80h_pattern)
data_run_offset = sequence_list[32:34][::-1]
data_run_offset_str = ''.join(data_run_offset)
data_run_offset_decimal_value = int(data_run_offset_str, 16)
data_run_list = sequence_list[data_run_offset_decimal_value:]
fragments = ExportDataRunList(data_run_list)
results = ParseMultipleDataRuns(fragments)
return results
# if __name__ == '__main__':
# arri80_data = GetFile80hPattern(r"Z:\hello.txt")
# data = GetFragmentData(arri80_data)
# print(data)